diff --git a/Cargo.lock b/Cargo.lock index 71a291c9669..e9c7860a1e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2649,7 +2649,7 @@ dependencies = [ "quick-protobuf", "quick-protobuf-codec", "quickcheck-ext", - "rand 0.8.5", + "rand 0.9.0", "regex", "serde", "sha2", diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index e29de924ae6..ae964d87a9d 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -5,6 +5,10 @@ - Fix underflow when shuffling peers after prunning. See [PR 6183](https://github.com/libp2p/rust-libp2p/pull/6183) + +- Implement gossipsub 1.3 extensions control message. + See [PR 6119](https://github.com/libp2p/rust-libp2p/pull/6119) + - Remove peer penalty for duplicate messages. See [PR 6112](https://github.com/libp2p/rust-libp2p/pull/6112) @@ -26,6 +30,9 @@ - Switch the internal `async-channel` used to dispatch messages from `NetworkBehaviour` to the `ConnectionHandler` with an internal priority queue. See [PR 6175](https://github.com/libp2p/rust-libp2p/pull/6175) +- Switch the internal `async-channel` used to dispatch messages from `NetworkBehaviour` to the `ConnectionHandler` + with an internal priority queue. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX) + - gossipsub: do early return in for an empty input See [PR 6208](https://github.com/libp2p/rust-libp2p/pull/6208). diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 911b453f477..cda14ffaab7 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"] [features] wasm-bindgen = ["getrandom/js", "futures-timer/wasm-bindgen"] metrics = ["prometheus-client"] +partial_messages = [] [dependencies] async-channel = "2.3.1" @@ -33,7 +34,7 @@ libp2p-identity = { workspace = true, features = ["rand"] } libp2p-swarm = { workspace = true } quick-protobuf = "0.8" quick-protobuf-codec = { workspace = true } -rand = "0.8" +rand = "0.9" regex = "1.10.5" serde = { version = "1", optional = true, features = ["derive"] } sha2 = "0.10.8" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index cdf160b4aaf..baff9368fe1 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -48,7 +48,7 @@ use libp2p_swarm::{ #[cfg(feature = "metrics")] use prometheus_client::registry::Registry; use quick_protobuf::{MessageWrite, Writer}; -use rand::{seq::SliceRandom, thread_rng}; +use rand::{rng, seq::SliceRandom}; use web_time::{Instant, SystemTime}; #[cfg(feature = "metrics")] @@ -68,12 +68,17 @@ use crate::{ topic::{Hasher, Topic, TopicHash}, transform::{DataTransform, IdentityTransform}, types::{ - ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId, - PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, + ControlAction, Extensions, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, + MessageId, PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction, }, FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError, }; +#[cfg(feature = "partial_messages")] +use crate::{ + extensions::partial_messages::{self, Partial, PublishAction, ReceivedAction}, + types::SubscriptionOpts, +}; #[cfg(test)] mod tests; @@ -141,6 +146,20 @@ pub enum Event { /// The decompressed message itself. message: Message, }, + /// A new partial message has been received. + #[cfg(feature = "partial_messages")] + Partial { + /// The topic of the partial message. + topic_hash: TopicHash, + /// The peer that forwarded us this message. + peer_id: PeerId, + /// The group ID that identifies the complete logical message. + group_id: Vec, + /// The partial message data. + message: Option>, + /// The partial message metadata, what peer has and wants. + metadata: Option>, + }, /// A remote subscribed to a topic. Subscribed { /// Remote that has subscribed. @@ -290,6 +309,10 @@ pub struct Behaviour { /// Overlay network of connected peers - Maps topics to connected gossipsub peers. mesh: HashMap>, + /// Partial Messages extension handler. + #[cfg(feature = "partial_messages")] + partial_messages_extension: partial_messages::State, + /// Map of topics to list of peers that we publish to, but don't subscribe to. fanout: HashMap>, @@ -453,6 +476,8 @@ where data_transform, failed_messages: Default::default(), gossip_promises: Default::default(), + #[cfg(feature = "partial_messages")] + partial_messages_extension: Default::default(), }) } @@ -514,9 +539,35 @@ where /// Subscribe to a topic. /// - /// Returns [`Ok(true)`](Ok) if the subscription worked. Returns [`Ok(false)`](Ok) if we were - /// already subscribed. + /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already + /// subscribed. pub fn subscribe(&mut self, topic: &Topic) -> Result { + self.subscribe_inner(topic, false, false) + } + + /// Subscribe to a topic with partial options. + /// + /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already + /// subscribed. + #[cfg(feature = "partial_messages")] + pub fn subscribe_partial( + &mut self, + topic: &Topic, + requests_partial: bool, + ) -> Result { + self.subscribe_inner(topic, true, requests_partial) + } + + /// Subscribe to a topic. + /// + /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already + /// subscribed. + fn subscribe_inner( + &mut self, + topic: &Topic, + supports_partial: bool, + requests_partial: bool, + ) -> Result { let topic_hash = topic.hash(); if !self.subscription_filter.can_subscribe(&topic_hash) { return Err(SubscriptionError::NotAllowed); @@ -530,13 +581,23 @@ where // send subscription request to all peers for peer_id in self.connected_peers.keys().copied().collect::>() { tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer"); - let event = RpcOut::Subscribe(topic_hash.clone()); + + let event = RpcOut::Subscribe { + topic: topic_hash.clone(), + requests_partial, + supports_partial, + }; self.send_message(peer_id, event); } // call JOIN(topic) // this will add new peers to the mesh for the topic self.join(&topic_hash); + + #[cfg(feature = "partial_messages")] + self.partial_messages_extension + .subscribe(topic_hash, supports_partial, requests_partial); + tracing::debug!(%topic, "Subscribed to topic"); Ok(true) } @@ -564,6 +625,10 @@ where // this will remove the topic from the mesh self.leave(&topic_hash); + #[cfg(feature = "partial_messages")] + self.partial_messages_extension + .unsubscribe(&topic_hash.clone()); + tracing::debug!(topic=%topic_hash, "Unsubscribed from topic"); true } @@ -592,7 +657,6 @@ where return Err(PublishError::MessageTooLarge); } - let mesh_n = self.config.mesh_n_for_topic(&topic); let raw_message = self.build_raw_message(topic, transformed_data)?; // calculate the message id from the un-transformed data @@ -618,112 +682,22 @@ where let topic_hash = raw_message.topic.clone(); - let mut peers_on_topic = self - .connected_peers - .iter() - .filter(|(_, p)| p.topics.contains(&topic_hash)) - .map(|(peer_id, _)| peer_id) - .peekable(); - - if peers_on_topic.peek().is_none() { - return Err(PublishError::NoPeersSubscribedToTopic); - } + let candidates = self.publish_peers(&topic_hash); - let mut recipient_peers = HashSet::new(); - if self.config.flood_publish() { - // Forward to all peers above score and all explicit peers - recipient_peers.extend(peers_on_topic.filter(|p| { - self.explicit_peers.contains(*p) - || !self - .peer_score - .below_threshold(p, |ts| ts.publish_threshold) - .0 - })); - } else { - match self.mesh.get(&topic_hash) { - // Mesh peers - Some(mesh_peers) => { - // We have a mesh set. We want to make sure to publish to at least `mesh_n` - // peers (if possible). - let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len()); - - if needed_extra_peers > 0 { - // We don't have `mesh_n` peers in our mesh, we will randomly select extras - // and publish to them. - - // Get a random set of peers that are appropriate to send messages too. - let peer_list = get_random_peers( - &self.connected_peers, - &topic_hash, - needed_extra_peers, - |peer| { - !mesh_peers.contains(peer) - && !self.explicit_peers.contains(peer) - && !self - .peer_score - .below_threshold(peer, |ts| ts.publish_threshold) - .0 - }, - ); - recipient_peers.extend(peer_list); - } - - recipient_peers.extend(mesh_peers); - } - // Gossipsub peers - None => { - tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); - // `fanout_peers` is always non-empty if it's `Some`. - let fanout_peers = self - .fanout - .get(&topic_hash) - .filter(|peers| !peers.is_empty()); - // If we have fanout peers add them to the map. - if let Some(peers) = fanout_peers { - for peer in peers { - recipient_peers.insert(*peer); - } - } else { - // We have no fanout peers, select mesh_n of them and add them to the fanout - let new_peers = - get_random_peers(&self.connected_peers, &topic_hash, mesh_n, { - |p| { - !self.explicit_peers.contains(p) - && !self - .peer_score - .below_threshold(p, |ts| ts.publish_threshold) - .0 - } - }); - // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - tracing::debug!(%peer, "Peer added to fanout"); - recipient_peers.insert(peer); - } - } - // We are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); - } - } + #[cfg(feature = "partial_messages")] + let candidates: HashSet<_> = candidates + .into_iter() + .filter(|peer_id| { + !self + .partial_messages_extension + .requests_partial(peer_id, &topic_hash) + }) + .collect(); - // Explicit peers that are part of the topic - recipient_peers - .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id))); + let recipients = self.filter_publish_candidates(&topic_hash, candidates); - // Floodsub peers - for (peer, connections) in &self.connected_peers { - if connections.kind == PeerKind::Floodsub - && connections.topics.contains(&topic_hash) - && !self - .peer_score - .below_threshold(peer, |ts| ts.publish_threshold) - .0 - { - recipient_peers.insert(*peer); - } - } + if recipients.is_empty() { + return Err(PublishError::NoPeersSubscribedToTopic); } // If the message isn't a duplicate and we have sent it to some peers add it to the @@ -736,7 +710,7 @@ where // Send to peers we know are subscribed to the topic. let mut publish_failed = true; - for peer_id in recipient_peers.iter() { + for peer_id in recipients.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); // If enabled, Send first an IDONTWANT so that if we are slower than forwarders // publishing the original message we don't receive it back. @@ -763,12 +737,8 @@ where } } - if recipient_peers.is_empty() { - return Err(PublishError::NoPeersSubscribedToTopic); - } - if publish_failed { - return Err(PublishError::AllQueuesFull(recipient_peers.len())); + return Err(PublishError::AllQueuesFull(recipients.len())); } tracing::debug!(message_id=%msg_id, "Published message"); @@ -781,6 +751,170 @@ where Ok(msg_id) } + /// Get all peers on the topic that have a sufficiently high score to allow publishing. + fn publish_peers(&self, topic_hash: &TopicHash) -> HashSet { + self.connected_peers + .iter() + .filter(|(_, peer)| peer.topics.contains(topic_hash)) + .filter(|(peer_id, _)| { + self.explicit_peers.contains(*peer_id) + || !self + .peer_score + .below_threshold(peer_id, |ts| ts.publish_threshold) + .0 + }) + .map(|(peer_id, _)| *peer_id) + .collect() + } + + /// Filter publish recipients from a list of candidates. + fn filter_publish_candidates( + &mut self, + topic_hash: &TopicHash, + candidates: HashSet, + ) -> HashSet { + if self.config.flood_publish() { + return candidates; + } + let mesh_n = self.config.mesh_n_for_topic(topic_hash); + let mut recipients = HashSet::new(); + // Always include explicit peers and floodsub peers from candidates + recipients.extend( + candidates + .iter() + .filter(|peer_id| { + self.explicit_peers.contains(*peer_id) + || self + .connected_peers + .get(*peer_id) + .map(|p| p.kind == PeerKind::Floodsub) + .unwrap_or(false) + }) + .copied(), + ); + match self.mesh.get(topic_hash) { + Some(mesh_peers) => { + // Filter mesh peers to those in candidates + let mesh_peers = candidates + .iter() + .filter(|peer_id| mesh_peers.contains(peer_id)) + .copied() + .collect::>(); + + let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len()); + if needed_extra_peers > 0 { + let remaining = candidates + .iter() + .filter(|peer_id| !mesh_peers.contains(peer_id)) + .filter_map(|peer_id| { + self.connected_peers + .get(peer_id) + .map(|peer| (peer_id, peer)) + }); + + let extras = + get_random_peers(remaining, topic_hash, needed_extra_peers, |_, _| true); + recipients.extend(extras); + } + recipients.extend(mesh_peers); + } + None => { + tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); + + // Filter fanout peers to those in candidates + let fanout_peers: Vec = self + .fanout + .get(topic_hash) + .map(|f| { + f.iter() + .filter(|p| candidates.contains(*p)) + .copied() + .collect() + }) + .unwrap_or_default(); + let needed_extra_peers = mesh_n.saturating_sub(fanout_peers.len()); + // If we have fanout peers add them to the map. + recipients.extend(fanout_peers); + if needed_extra_peers > 0 { + let remaining = candidates + .iter() + .filter(|peer_id| !recipients.contains(*peer_id)) + .filter_map(|peer_id| { + self.connected_peers + .get(peer_id) + .map(|peer| (peer_id, peer)) + }); + + let new_peers = + get_random_peers(remaining, topic_hash, needed_extra_peers, |_, _| true); + + tracing::debug!(?new_peers, "Peers added to fanout"); + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + recipients.extend(new_peers); + } + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); + } + } + recipients + } + + #[cfg(feature = "partial_messages")] + pub fn publish_partial( + &mut self, + topic: impl Into, + partial_message: P, + ) -> Result<(), PublishError> { + let topic_hash = topic.into(); + let candidates = self + .publish_peers(&topic_hash) + .into_iter() + .filter(|peer_id| { + self.partial_messages_extension + .supports_partial(peer_id, &topic_hash) + }) + .collect(); + + let mut recipients = self.filter_publish_candidates(&topic_hash, candidates); + + // We add the peers which also have the same group_id to the publish peers, + // this allows us to reply to peers whom may not be on our mesh but still want the partial update. + let group_id = partial_message.group_id(); + let transient_peers = self.connected_peers.keys().filter(|peer_id| { + self.partial_messages_extension + .group_id(peer_id, &topic_hash, &group_id) + }); + + recipients.extend(transient_peers); + + if recipients.is_empty() { + return Err(PublishError::NoPeersSubscribedToTopic); + } + + let publish_actions = self.partial_messages_extension.handle_publish( + topic_hash.clone(), + partial_message, + recipients, + ); + + for action in publish_actions { + match action { + PublishAction::SendMessage { peer_id, rpc } => { + self.send_message(peer_id, rpc); + } + PublishAction::PenalizePeer { + peer_id, + topic_hash, + } => { + if let PeerScoreState::Active(peer_score) = &mut self.peer_score { + peer_score.reject_invalid_partial(peer_id, &topic_hash); + } + } + } + } + Ok(()) + } + /// This function should be called when [`Config::validate_messages()`] is `true` after /// the message got validated by the caller. Messages are stored in the ['Memcache'] and /// validation is expected to be fast enough that the messages should still exist in the cache. @@ -1023,11 +1157,11 @@ where &self.connected_peers, topic_hash, mesh_n - added_peers.len(), - |peer| { - !added_peers.contains(peer) - && !self.explicit_peers.contains(peer) - && !self.peer_score.below_threshold(peer, |_| 0.0).0 - && !self.backoffs.is_backoff_with_slack(topic_hash, peer) + |peer_id, _| { + !added_peers.contains(peer_id) + && !self.explicit_peers.contains(peer_id) + && !self.peer_score.below_threshold(peer_id, |_| 0.0).0 + && !self.backoffs.is_backoff_with_slack(topic_hash, peer_id) }, ); @@ -1070,14 +1204,6 @@ where ); } - #[cfg(feature = "metrics")] - { - let mesh_peers = self.mesh_peers(topic_hash).count(); - if let Some(m) = self.metrics.as_mut() { - m.set_mesh_peers(topic_hash, mesh_peers) - } - } - tracing::debug!(topic=%topic_hash, "Completed JOIN for topic"); } @@ -1117,7 +1243,9 @@ where &self.connected_peers, topic_hash, self.config.prune_peers(), - |p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0, + |peer_id, _| { + peer_id != peer && !self.peer_score.below_threshold(peer_id, |_| 0.0).0 + }, ) .into_iter() .map(|p| PeerInfo { peer_id: Some(p) }) @@ -1274,7 +1402,7 @@ where // Ask in random order let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect(); - let mut rng = thread_rng(); + let mut rng = rng(); iwant_ids_vec.partial_shuffle(&mut rng, iask); iwant_ids_vec.truncate(iask); @@ -1521,6 +1649,32 @@ where tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer"); } + fn handle_extensions(&mut self, peer_id: &PeerId, extensions: Extensions) { + let Some(peer) = self.connected_peers.get_mut(peer_id) else { + tracing::error!( + peer=%peer_id, + "Extensions by unknown peer" + ); + return; + }; + + if peer.extensions.is_some() { + tracing::debug!( + peer=%peer_id, + "Peer had already sent us extensions message" + ); + return; + } + + peer.extensions = Some(extensions); + + if extensions.test_extension.unwrap_or(false) + && matches!(peer.kind, PeerKind::Gossipsubv1_3) + { + self.send_message(*peer_id, RpcOut::TestExtension); + } + } + /// Removes the specified peer from the mesh, returning true if it was present. fn remove_peer_from_mesh( &mut self, @@ -1622,7 +1776,7 @@ where px.retain(|p| p.peer_id.is_some()); if px.len() > n { // only use at most prune_peers many random peers - let mut rng = thread_rng(); + let mut rng = rng(); px.partial_shuffle(&mut rng, n); px = px.into_iter().take(n).collect(); } @@ -1936,6 +2090,13 @@ where match subscription.action { SubscriptionAction::Subscribe => { + #[cfg(feature = "partial_messages")] + self.partial_messages_extension.peer_subscribed( + propagation_source, + topic_hash.clone(), + subscription.options, + ); + if peer.topics.insert(topic_hash.clone()) { tracing::debug!( peer=%propagation_source, @@ -2006,6 +2167,10 @@ where } } + #[cfg(feature = "partial_messages")] + self.partial_messages_extension + .peer_unsubscribed(*propagation_source, &topic_hash); + unsubscribed_peers.push((*propagation_source, topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed { @@ -2191,13 +2356,17 @@ where ); // not enough peers - get mesh_n - current_length more let desired_peers = mesh_n - peers.len(); - let peer_list = - get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| { - !peers.contains(peer) - && !explicit_peers.contains(peer) - && !backoffs.is_backoff_with_slack(topic_hash, peer) - && scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0 - }); + let peer_list = get_random_peers( + &self.connected_peers, + topic_hash, + desired_peers, + |peer_id, _| { + !peers.contains(peer_id) + && !explicit_peers.contains(peer_id) + && !backoffs.is_backoff_with_slack(topic_hash, peer_id) + && scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0 + }, + ); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); @@ -2222,7 +2391,7 @@ where let excess_peer_no = peers.len() - mesh_n; // shuffle the peers and then sort by score ascending beginning with the worst - let mut rng = thread_rng(); + let mut rng = rng(); let mut shuffled = peers.iter().copied().collect::>(); shuffled.shuffle(&mut rng); shuffled.sort_by(|p1, p2| { @@ -2294,8 +2463,11 @@ where // if we have not enough outbound peers, graft to some new outbound peers if outbound < mesh_outbound_min { let needed = mesh_outbound_min - outbound; - let peer_list = - get_random_peers(&self.connected_peers, topic_hash, needed, |peer_id| { + let peer_list = get_random_peers( + &self.connected_peers, + topic_hash, + needed, + |peer_id, _| { !peers.contains(peer_id) && !explicit_peers.contains(peer_id) && !backoffs.is_backoff_with_slack(topic_hash, peer_id) @@ -2304,7 +2476,8 @@ where .connected_peers .get(peer_id) .is_some_and(|peer| peer.outbound) - }); + }, + ); for peer in &peer_list { let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); @@ -2370,7 +2543,7 @@ where &self.connected_peers, topic_hash, self.config.opportunistic_graft_peers(), - |peer_id| { + |peer_id, _| { !peers.contains(peer_id) && !explicit_peers.contains(peer_id) && !backoffs.is_backoff_with_slack(topic_hash, peer_id) @@ -2396,10 +2569,25 @@ where } } } - // Register the final count of peers in the mesh #[cfg(feature = "metrics")] - if let Some(m) = self.metrics.as_mut() { - m.set_mesh_peers(topic_hash, peers.len()) + { + if let Some(m) = self.metrics.as_mut() { + #[cfg(not(feature = "partial_messages"))] + { + let mesh_peers = peers.len(); + m.set_mesh_peers(topic_hash, mesh_peers, false); + } + #[cfg(feature = "partial_messages")] + { + let (full, partial): (Vec, Vec) = + peers.iter().partition(|peer_id| { + self.partial_messages_extension + .supports_partial(peer_id, topic_hash) + }); + m.set_mesh_peers(topic_hash, full.len(), false); + m.set_mesh_peers(topic_hash, partial.len(), true); + } + } } } @@ -2462,15 +2650,19 @@ where ); let needed_peers = mesh_n - peers.len(); let explicit_peers = &self.explicit_peers; - let new_peers = - get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| { + let new_peers = get_random_peers( + &self.connected_peers, + topic_hash, + needed_peers, + |peer_id, _| { !peers.contains(peer_id) && !explicit_peers.contains(peer_id) && !self .peer_score .below_threshold(peer_id, |ts| ts.publish_threshold) .0 - }); + }, + ); peers.extend(new_peers); } } @@ -2520,7 +2712,7 @@ where } self.failed_messages.shrink_to_fit(); - // Flush stale IDONTWANTs. + // Flush stale IDONTWANTs and partial messages. for peer in self.connected_peers.values_mut() { while let Some((_front, instant)) = peer.dont_send.front() { if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() { @@ -2531,6 +2723,9 @@ where } } + #[cfg(feature = "partial_messages")] + self.partial_messages_extension.heartbeat(); + #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); @@ -2541,7 +2736,7 @@ where /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh /// and fanout peers fn emit_gossip(&mut self) { - let mut rng = thread_rng(); + let mut rng = rng(); let mut messages = Vec::new(); for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) { let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash); @@ -2569,15 +2764,19 @@ where ) }; // get gossip_lazy random peers - let to_msg_peers = - get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| { - !peers.contains(peer) - && !self.explicit_peers.contains(peer) + let to_msg_peers = get_random_peers_dynamic( + self.connected_peers.iter(), + topic_hash, + n_map, + |peer_id, _| { + !peers.contains(peer_id) + && !self.explicit_peers.contains(peer_id) && !self .peer_score - .below_threshold(peer, |ts| ts.gossip_threshold) + .below_threshold(peer_id, |ts| ts.gossip_threshold) .0 - }); + }, + ); tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len()); @@ -2871,9 +3070,23 @@ where fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool { #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { - if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc { - // register bytes sent on the internal metrics. - m.msg_sent(&message.topic, message.raw_protobuf_len()); + // register bytes sent on the internal metrics. + match &rpc { + RpcOut::Publish { message, .. } => { + m.msg_sent(&message.topic, false, message.raw_protobuf_len()) + } + + #[cfg(feature = "partial_messages")] + RpcOut::PartialMessage(crate::partial_messages::PartialMessage { + topic_hash, + body, + .. + }) => m.msg_sent( + topic_hash, + true, + body.as_ref().map(|m| m.len()).unwrap_or_default(), + ), + _ => {} } } @@ -2953,7 +3166,26 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. for topic_hash in self.mesh.clone().into_keys() { - self.send_message(peer_id, RpcOut::Subscribe(topic_hash)); + #[cfg(not(feature = "partial_messages"))] + let (requests_partial, supports_partial) = (false, false); + #[cfg(feature = "partial_messages")] + let Some(SubscriptionOpts { + requests_partial, + supports_partial, + }) = self.partial_messages_extension.opts(&topic_hash) + else { + tracing::error!("Partial subscription options should exist for subscribed topic"); + return; + }; + + self.send_message( + peer_id, + RpcOut::Subscribe { + topic: topic_hash.clone(), + requests_partial, + supports_partial, + }, + ); } } @@ -2994,7 +3226,7 @@ where // connection handler. if !peer.connections.is_empty() { for topic in &peer.topics { - if let Some(mesh_peers) = self.mesh.get(topic) { + if let Some(mesh_peers) = self.mesh.get(&topic) { if mesh_peers.contains(&peer_id) { self.events.push_back(ToSwarm::NotifyHandler { peer_id, @@ -3024,7 +3256,6 @@ where #[cfg(feature = "metrics")] if let Some(m) = self.metrics.as_mut() { m.peers_removed(topic, Churn::Dc, 1); - m.set_mesh_peers(topic, mesh_peers.len()); } }; } @@ -3034,6 +3265,9 @@ where m.dec_topic_peers(topic); } + #[cfg(feature = "partial_messages")] + self.partial_messages_extension.peer_disconnected(peer_id); + // remove from fanout self.fanout .get_mut(topic) @@ -3133,16 +3367,33 @@ where messages: Queue::new(self.config.connection_handler_queue_len()), topics: Default::default(), dont_send: LinkedHashMap::new(), + extensions: None, }); // Add the new connection connected_peer.connections.push(connection_id); + let queue = connected_peer.messages.clone(); + + // If this is the first connection send extensions message. + if connected_peer.connections.len() <= 1 { + self.send_message( + peer_id, + RpcOut::Extensions(Extensions { + test_extension: Some(true), + partial_messages: if cfg!(feature = "partial_messages") { + Some(true) + } else { + None + }, + }), + ); + } + + #[cfg(feature = "partial_messages")] + self.partial_messages_extension.peer_connected(peer_id); // This clones a reference to the Queue so any new handlers reference the same underlying // queue. No data is actually cloned here. - Ok(Handler::new( - self.config.protocol_config(), - connected_peer.messages.clone(), - )) + Ok(Handler::new(self.config.protocol_config(), queue)) } fn handle_established_outbound_connection( @@ -3162,16 +3413,33 @@ where messages: Queue::new(self.config.connection_handler_queue_len()), topics: Default::default(), dont_send: LinkedHashMap::new(), + extensions: None, }); // Add the new connection connected_peer.connections.push(connection_id); + let queue = connected_peer.messages.clone(); + + // If this is the first connection send extensions message. + if connected_peer.connections.len() <= 1 { + self.send_message( + peer_id, + RpcOut::Extensions(Extensions { + test_extension: Some(true), + partial_messages: if cfg!(feature = "partial_messages") { + Some(true) + } else { + None + }, + }), + ); + } + + #[cfg(feature = "partial_messages")] + self.partial_messages_extension.peer_connected(peer_id); // This clones a reference to the Queue so any new handlers reference the same underlying // queue. No data is actually cloned here. - Ok(Handler::new( - self.config.protocol_config(), - connected_peer.messages.clone(), - )) + Ok(Handler::new(self.config.protocol_config(), queue)) } fn on_connection_handler_event( @@ -3340,6 +3608,11 @@ where } } } + ControlAction::Extensions(extensions) => { + if let Some(extensions) = extensions { + self.handle_extensions(&propagation_source, extensions); + } + } } } if !ihave_msgs.is_empty() { @@ -3351,6 +3624,55 @@ where if !prune_msgs.is_empty() { self.handle_prune(&propagation_source, prune_msgs); } + + if let Some(_extension) = rpc.test_extension { + tracing::debug!("Received Test Extension"); + } + + #[cfg(feature = "partial_messages")] + if let Some(partial_message) = rpc.partial_message { + if self + .peer_score + .below_threshold(&propagation_source, |ts| ts.graylist_threshold) + .0 + { + tracing::debug!("Peer below threshold, ignoring partial message"); + } + + let result = self + .partial_messages_extension + .handle_received(propagation_source, partial_message); + match result { + ReceivedAction::Publish(PublishAction::SendMessage { peer_id, rpc }) => { + self.send_message(peer_id, rpc); + } + partial_messages::ReceivedAction::EmitEvent { + topic_hash, + peer_id, + group_id, + message, + metadata, + } => { + self.events + .push_back(ToSwarm::GenerateEvent(Event::Partial { + topic_hash, + peer_id, + group_id, + message, + metadata, + })); + } + ReceivedAction::Publish(PublishAction::PenalizePeer { + peer_id, + topic_hash, + }) => { + if let PeerScoreState::Active(peer_score) = &mut self.peer_score { + peer_score.reject_invalid_partial(peer_id, &topic_hash); + } + } + partial_messages::ReceivedAction::None => {} + } + } } } } @@ -3483,17 +3805,17 @@ fn peer_removed_from_mesh( /// Helper function to get a subset of random gossipsub peers for a `topic_hash` /// filtered by the function `f`. The number of peers to get equals the output of `n_map` /// that gets as input the number of filtered peers. -fn get_random_peers_dynamic( - connected_peers: &HashMap, +fn get_random_peers_dynamic<'a>( + peers: impl IntoIterator, topic_hash: &TopicHash, // maps the number of total peers to the number of selected peers n_map: impl Fn(usize) -> usize, - mut f: impl FnMut(&PeerId) -> bool, + f: impl Fn(&PeerId, &PeerDetails) -> bool, ) -> BTreeSet { - let mut gossip_peers = connected_peers - .iter() + let mut gossip_peers = peers + .into_iter() .filter(|(_, p)| p.topics.contains(topic_hash)) - .filter(|(peer_id, _)| f(peer_id)) + .filter(|(peer_id, peer_details)| f(peer_id, peer_details)) .filter(|(_, p)| p.kind.is_gossipsub()) .map(|(peer_id, _)| *peer_id) .collect::>(); @@ -3506,7 +3828,7 @@ fn get_random_peers_dynamic( } // we have more peers than needed, shuffle them and return n of them - let mut rng = thread_rng(); + let mut rng = rng(); gossip_peers.partial_shuffle(&mut rng, n); tracing::debug!("RANDOM PEERS: Got {:?} peers", n); @@ -3516,13 +3838,13 @@ fn get_random_peers_dynamic( /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` /// filtered by the function `f`. -fn get_random_peers( - connected_peers: &HashMap, +fn get_random_peers<'a>( + peers: impl IntoIterator, topic_hash: &TopicHash, n: usize, - f: impl FnMut(&PeerId) -> bool, + f: impl Fn(&PeerId, &PeerDetails) -> bool, ) -> BTreeSet { - get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f) + get_random_peers_dynamic(peers, topic_hash, |_| n, f) } /// Validates the combination of signing, privacy and message validation to ensure the diff --git a/protocols/gossipsub/src/behaviour/tests/explicit_peers.rs b/protocols/gossipsub/src/behaviour/tests/explicit_peers.rs index a0d9b1b1e1b..be9a3ec7a76 100644 --- a/protocols/gossipsub/src/behaviour/tests/explicit_peers.rs +++ b/protocols/gossipsub/src/behaviour/tests/explicit_peers.rs @@ -264,6 +264,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { &[Subscription { action: SubscriptionAction::Subscribe, topic_hash: topic_hash.clone(), + options: Default::default(), }], peer, ); @@ -312,6 +313,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { &[Subscription { action: SubscriptionAction::Subscribe, topic_hash: topic_hash.clone(), + options: Default::default(), }], peer, ); diff --git a/protocols/gossipsub/src/behaviour/tests/gossip.rs b/protocols/gossipsub/src/behaviour/tests/gossip.rs index fb708cc4d98..2621e2fec96 100644 --- a/protocols/gossipsub/src/behaviour/tests/gossip.rs +++ b/protocols/gossipsub/src/behaviour/tests/gossip.rs @@ -207,6 +207,9 @@ fn test_handle_iwant_msg_but_already_sent_idontwant() { let rpc = RpcIn { messages: vec![], subscriptions: vec![], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, control_msgs: vec![ControlAction::IDontWant(IDontWant { message_ids: vec![msg_id.clone()], })], diff --git a/protocols/gossipsub/src/behaviour/tests/idontwant.rs b/protocols/gossipsub/src/behaviour/tests/idontwant.rs index 51a3e63c698..554a2bb9f48 100644 --- a/protocols/gossipsub/src/behaviour/tests/idontwant.rs +++ b/protocols/gossipsub/src/behaviour/tests/idontwant.rs @@ -225,6 +225,9 @@ fn parses_idontwant() { let rpc = RpcIn { messages: vec![], subscriptions: vec![], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, control_msgs: vec![ControlAction::IDontWant(IDontWant { message_ids: vec![message_id.clone()], })], diff --git a/protocols/gossipsub/src/behaviour/tests/mesh.rs b/protocols/gossipsub/src/behaviour/tests/mesh.rs index 796f19ea309..90a80ba51dc 100644 --- a/protocols/gossipsub/src/behaviour/tests/mesh.rs +++ b/protocols/gossipsub/src/behaviour/tests/mesh.rs @@ -148,6 +148,7 @@ fn test_get_random_peers() { peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -157,27 +158,27 @@ fn test_get_random_peers() { ); } - let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_, _| true); assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned"); - let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_, _| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_, _| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_, _| true); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); // test the filter - let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false); + let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_, _| false); assert!(random_peers.is_empty(), "Expected 0 peers to be returned"); let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, { - |peer| peers.contains(peer) + |peer, _| peers.contains(peer) }); assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); } diff --git a/protocols/gossipsub/src/behaviour/tests/mod.rs b/protocols/gossipsub/src/behaviour/tests/mod.rs index 83ba4e0166d..852641226ab 100644 --- a/protocols/gossipsub/src/behaviour/tests/mod.rs +++ b/protocols/gossipsub/src/behaviour/tests/mod.rs @@ -36,6 +36,8 @@ mod gossip; mod graft_prune; mod idontwant; mod mesh; +#[cfg(feature = "partial_messages")] +mod partial; mod peer_queues; mod publish; mod scoring; @@ -322,6 +324,7 @@ where peer, PeerDetails { kind: kind.unwrap_or(PeerKind::Floodsub), + extensions: None, outbound, connections: vec![connection_id], topics: Default::default(), @@ -355,6 +358,7 @@ where .map(|t| Subscription { action: SubscriptionAction::Subscribe, topic_hash: t, + options: Default::default(), }) .collect::>(), &peer, @@ -501,9 +505,13 @@ pub(super) fn proto_to_message(rpc: &proto::RPC) -> RpcIn { SubscriptionAction::Unsubscribe }, topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), + options: Default::default(), }) .collect(), control_msgs, + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, } } diff --git a/protocols/gossipsub/src/behaviour/tests/partial.rs b/protocols/gossipsub/src/behaviour/tests/partial.rs new file mode 100644 index 00000000000..cefaf0cff42 --- /dev/null +++ b/protocols/gossipsub/src/behaviour/tests/partial.rs @@ -0,0 +1,100 @@ +use std::collections::BTreeSet; + +use hashlink::LinkedHashMap; +use libp2p_core::PeerId; +use libp2p_swarm::{ConnectionId, NetworkBehaviour}; + +use crate::{ + handler::HandlerEvent, + queue::Queue, + rpc_proto::proto, + types::{ControlAction, Extensions, PeerDetails, PeerKind, RpcIn, RpcOut}, + Behaviour, ConfigBuilder, MessageAuthenticity, ValidationMode, +}; + +#[test] +fn test_extensions_message_creation() { + let extensions_rpc = RpcOut::Extensions(Extensions { + test_extension: Some(true), + partial_messages: None, + }); + let proto_rpc: proto::RPC = extensions_rpc.into(); + + assert!(proto_rpc.control.is_some()); + let control = proto_rpc.control.unwrap(); + assert!(control.extensions.is_some()); + let test_extension = control.extensions.unwrap().testExtension.unwrap(); + assert!(test_extension); + assert!(control.ihave.is_empty()); + assert!(control.iwant.is_empty()); + assert!(control.graft.is_empty()); + assert!(control.prune.is_empty()); + assert!(control.idontwant.is_empty()); +} + +#[test] +fn test_handle_extensions_message() { + let mut gs: Behaviour = Behaviour::new( + MessageAuthenticity::Anonymous, + ConfigBuilder::default() + .validation_mode(ValidationMode::None) + .build() + .unwrap(), + ) + .unwrap(); + + let peer_id = PeerId::random(); + let messages = Queue::new(gs.config.connection_handler_queue_len()); + + // Add peer without extensions + gs.connected_peers.insert( + peer_id, + PeerDetails { + kind: PeerKind::Gossipsubv1_3, + connections: vec![ConnectionId::new_unchecked(0)], + outbound: false, + topics: BTreeSet::new(), + messages, + dont_send: LinkedHashMap::new(), + extensions: None, + }, + ); + + // Simulate receiving extensions message + let extensions = Extensions { + test_extension: Some(false), + partial_messages: None, + }; + gs.handle_extensions(&peer_id, extensions); + + // Verify extensions were stored + let peer_details = gs.connected_peers.get(&peer_id).unwrap(); + assert!(peer_details.extensions.is_some()); + + // Simulate receiving duplicate extensions message from another peer + let duplicate_rpc = RpcIn { + messages: vec![], + subscriptions: vec![], + control_msgs: vec![ControlAction::Extensions(Some(Extensions { + test_extension: Some(true), + partial_messages: None, + }))], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, + }; + + gs.on_connection_handler_event( + peer_id, + ConnectionId::new_unchecked(0), + HandlerEvent::Message { + rpc: duplicate_rpc, + invalid_messages: vec![], + }, + ); + + // Extensions should still be present (not cleared or changed) + let peer_details = gs.connected_peers.get(&peer_id).unwrap(); + let test_extension = peer_details.extensions.unwrap().test_extension.unwrap(); + assert!(!test_extension); +} diff --git a/protocols/gossipsub/src/behaviour/tests/peer_queues.rs b/protocols/gossipsub/src/behaviour/tests/peer_queues.rs index f70ae66d257..1201a3af57c 100644 --- a/protocols/gossipsub/src/behaviour/tests/peer_queues.rs +++ b/protocols/gossipsub/src/behaviour/tests/peer_queues.rs @@ -56,6 +56,7 @@ fn test_all_queues_full() { peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -93,6 +94,7 @@ fn test_slow_peer_returns_failed_publish() { slow_peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -106,6 +108,7 @@ fn test_slow_peer_returns_failed_publish() { peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -158,6 +161,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { slow_peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -175,6 +179,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -263,6 +268,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { slow_peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -280,6 +286,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -348,6 +355,7 @@ fn test_slow_peer_returns_failed_forward() { slow_peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -365,6 +373,7 @@ fn test_slow_peer_returns_failed_forward() { peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -438,6 +447,7 @@ fn test_slow_peer_is_downscored_on_publish() { slow_peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), @@ -452,6 +462,7 @@ fn test_slow_peer_is_downscored_on_publish() { peer_id, PeerDetails { kind: PeerKind::Gossipsubv1_1, + extensions: None, connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), diff --git a/protocols/gossipsub/src/behaviour/tests/scoring.rs b/protocols/gossipsub/src/behaviour/tests/scoring.rs index 977d7488956..5f1ade217bd 100644 --- a/protocols/gossipsub/src/behaviour/tests/scoring.rs +++ b/protocols/gossipsub/src/behaviour/tests/scoring.rs @@ -664,6 +664,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let subscription = Subscription { action: SubscriptionAction::Subscribe, topic_hash: topics[0].clone(), + options: Default::default(), }; let control_action = ControlAction::IHave(IHave { @@ -683,6 +684,9 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { messages: vec![raw_message1], subscriptions: vec![subscription.clone()], control_msgs: vec![control_action], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, }, invalid_messages: Vec::new(), }, @@ -709,6 +713,9 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { messages: vec![raw_message3], subscriptions: vec![subscription], control_msgs: vec![control_action], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, }, invalid_messages: Vec::new(), }, @@ -1303,6 +1310,9 @@ fn test_scoring_p4_invalid_signature() { messages: vec![], subscriptions: vec![], control_msgs: vec![], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, }, invalid_messages: vec![(m, ValidationError::InvalidSignature)], }, diff --git a/protocols/gossipsub/src/behaviour/tests/subscription.rs b/protocols/gossipsub/src/behaviour/tests/subscription.rs index e9d09f16c66..3c423864734 100644 --- a/protocols/gossipsub/src/behaviour/tests/subscription.rs +++ b/protocols/gossipsub/src/behaviour/tests/subscription.rs @@ -59,7 +59,7 @@ fn test_subscribe() { .into_values() .fold(0, |mut collected_subscriptions, mut queue| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe(_)) = queue.try_pop() { + if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() { collected_subscriptions += 1 } } @@ -119,7 +119,7 @@ fn test_unsubscribe() { .into_values() .fold(0, |mut collected_subscriptions, mut queue| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe(_)) = queue.try_pop() { + if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() { collected_subscriptions += 1 } } @@ -240,6 +240,7 @@ fn test_join() { random_peer, PeerDetails { kind: PeerKind::Floodsub, + extensions: None, outbound: false, connections: vec![connection_id], topics: Default::default(), @@ -308,7 +309,7 @@ fn test_peer_added_on_connection() { HashMap::>::new(), |mut collected_subscriptions, (peer, mut queue)| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe(topic)) = queue.try_pop() { + if let Some(RpcOut::Subscribe { topic, .. }) = queue.try_pop() { let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default(); peer_subs.push(topic.into_string()); collected_subscriptions.insert(peer, peer_subs); @@ -367,12 +368,14 @@ fn test_handle_received_subscriptions() { .map(|topic_hash| Subscription { action: SubscriptionAction::Subscribe, topic_hash: topic_hash.clone(), + options: Default::default(), }) .collect::>(); subscriptions.push(Subscription { action: SubscriptionAction::Unsubscribe, topic_hash: topic_hashes[topic_hashes.len() - 1].clone(), + options: Default::default(), }); let unknown_peer = PeerId::random(); @@ -430,6 +433,7 @@ fn test_handle_received_subscriptions() { &[Subscription { action: SubscriptionAction::Unsubscribe, topic_hash: topic_hashes[0].clone(), + options: Default::default(), }], &peers[0], ); diff --git a/protocols/gossipsub/src/behaviour/tests/topic_config.rs b/protocols/gossipsub/src/behaviour/tests/topic_config.rs index ff14c6f0d01..6053121d87c 100644 --- a/protocols/gossipsub/src/behaviour/tests/topic_config.rs +++ b/protocols/gossipsub/src/behaviour/tests/topic_config.rs @@ -636,6 +636,9 @@ fn test_validation_error_message_size_too_large_topic_specific() { messages: vec![raw_message], subscriptions: vec![], control_msgs: vec![], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, }, invalid_messages: vec![], }, @@ -680,6 +683,8 @@ fn test_validation_error_message_size_too_large_topic_specific() { }], subscriptions: vec![], control: None, + partial: None, + testExtension: None, }; codec.encode(rpc, &mut buf).unwrap(); @@ -740,6 +745,9 @@ fn test_validation_message_size_within_topic_specific() { messages: vec![raw_message], subscriptions: vec![], control_msgs: vec![], + test_extension: None, + #[cfg(feature = "partial_messages")] + partial_message: None, }, invalid_messages: vec![], }, @@ -784,6 +792,8 @@ fn test_validation_message_size_within_topic_specific() { }], subscriptions: vec![], control: None, + partial: None, + testExtension: None, }; codec.encode(rpc, &mut buf).unwrap(); diff --git a/protocols/gossipsub/src/error.rs b/protocols/gossipsub/src/error.rs index 7af14d84ac0..f8d98ec2fef 100644 --- a/protocols/gossipsub/src/error.rs +++ b/protocols/gossipsub/src/error.rs @@ -40,6 +40,10 @@ pub enum PublishError { /// Messages could not be sent because the queues for all peers were full. The usize represents /// the number of peers that were attempted. AllQueuesFull(usize), + + /// An Error while trying to publish a partial message. + #[cfg(feature = "partial_messages")] + Partial(crate::extensions::partial_messages::PartialError), } impl std::fmt::Display for PublishError { diff --git a/protocols/gossipsub/src/extensions/mod.rs b/protocols/gossipsub/src/extensions/mod.rs new file mode 100644 index 00000000000..e2f92481674 --- /dev/null +++ b/protocols/gossipsub/src/extensions/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2025 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#[cfg(feature = "partial_messages")] +pub mod partial_messages; diff --git a/protocols/gossipsub/src/extensions/partial_messages.rs b/protocols/gossipsub/src/extensions/partial_messages.rs new file mode 100644 index 00000000000..6a540a7ecf6 --- /dev/null +++ b/protocols/gossipsub/src/extensions/partial_messages.rs @@ -0,0 +1,609 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, +}; + +use libp2p_core::PeerId; + +use crate::{ + types::{RpcOut, SubscriptionOpts}, + TopicHash, +}; + +/// Default TTL for partial messages kept. +const DEFAULT_PARTIAL_TTL: usize = 5; + +/// PartialMessage is a message that can be broken up into parts. +/// This trait allows applications to define custom strategies for splitting large messages +/// into parts and reconstructing them from received partial data. It provides the core +/// operations needed for the gossipsub partial messages extension. +/// +/// The partial message protocol works as follows: +/// 1. Applications implement this trait to define how messages are split and reconstructed +/// 2. Peers advertise available parts using `available_parts()` metadata in PartialIHAVE +/// 3. Peers request missing parts using `missing_parts()` metadata in PartialIWANT +/// 4. When requests are received, `partial_message_bytes_from_metadata()` generates the response +/// 5. Received partial data is integrated using `extend_from_encoded_partial_message()` +/// 6. The `group_id()` ties all parts of the same logical message together +pub trait Partial: Send + Sync { + /// Returns the unique identifier for this message group. + /// + /// All partial messages belonging to the same logical message should return + /// the same group ID. This is used to associate partial messages together + /// during reconstruction. + fn group_id(&self) -> Vec; + + /// Returns application defined metadata describing which parts of the message + /// are available and which parts we want. + /// + /// The returned bytes will be sent in partsMetadata field to advertise + /// available and wanted parts to peers. + fn metadata(&self) -> Vec; + + /// Generates an action from the given metadata. + /// + /// When a peer requests specific parts (via PartialIWANT), this method + /// generates the actual message data to send back. The `metadata` parameter + /// describes what parts are being requested. + /// + /// Returns a [`PublishAction`] for the given metadata, or an error. + fn partial_action_from_metadata( + &self, + peer_id: PeerId, + metadata: Option<&[u8]>, + ) -> Result; +} + +pub trait Metadata: Debug + Send + Sync { + /// Return the `Metadata` as a byte slice. + fn as_slice(&self) -> &[u8]; + /// try to Update the `Metadata` with the remote data, + /// return true if it was updated. + fn update(&mut self, data: &[u8]) -> Result; +} + +/// Indicates the action to take for the given metadata. +pub struct PartialAction { + /// Indicate if we want remote data from the peer. + pub need: bool, + /// Indicate if we have data to send for that peer + pub send: Option<(Vec, Box)>, +} + +/// Partial message state for sent and received messages. +#[derive(Default)] +pub(crate) struct State { + /// Our subscription options per topic and respective cached partial messages we're publishing. + pub(crate) subscriptions: HashMap, + /// Per-peer partial state + pub(crate) peer_subscriptions: HashMap>, +} + +impl State { + /// Called by the [`Behaviour`] when we subscribed to the topic. + pub(crate) fn subscribe( + &mut self, + topic_hash: TopicHash, + supports_partial: bool, + requests_partial: bool, + ) { + self.subscriptions.insert( + topic_hash, + LocalSubscription { + options: Some(SubscriptionOpts { + requests_partial, + supports_partial, + }), + partial_messages: Default::default(), + }, + ); + } + + /// Called by the [`Behaviour`] when we unsubscribed from the topic. + pub(crate) fn unsubscribe(&mut self, topic_hash: &TopicHash) { + self.subscriptions.remove(&topic_hash.clone()); + } + + /// Called by the [`Behaviour`] when a peer has connected. + pub(crate) fn peer_connected(&mut self, peer_id: PeerId) { + self.peer_subscriptions.insert(peer_id, Default::default()); + } + + /// Called by the [`Behaviour`] when a peer has disconnected. + pub(crate) fn peer_disconnected(&mut self, peer_id: PeerId) { + self.peer_subscriptions.remove(&peer_id); + } + + /// Called by the [`Behaviour`] when a remote peer unsubscribed from the topic. + pub(crate) fn peer_subscribed( + &mut self, + peer_id: &PeerId, + topic_hash: TopicHash, + options: SubscriptionOpts, + ) { + let Some(peer) = self.peer_subscriptions.get_mut(peer_id) else { + tracing::error!( + %peer_id, + "Partial subscription by unknown peer" + ); + return; + }; + peer.insert( + topic_hash, + RemoteSubscription { + options: Some(options), + partial_messages: Default::default(), + }, + ); + } + + pub(crate) fn peer_unsubscribed(&mut self, peer_id: PeerId, topic_hash: &TopicHash) { + let Some(peer) = self.peer_subscriptions.get_mut(&peer_id) else { + tracing::error!( + %peer_id, + "Partial unsubscription by unknown peer" + ); + return; + }; + peer.remove(topic_hash); + } + + /// Called by the [`Behaviour`] during heartbeat. + /// Returns the list of the peers and respective partial metadata to send. + pub(crate) fn heartbeat(&mut self) { + for peer_state in self.peer_subscriptions.values_mut() { + for topics in peer_state.values_mut() { + topics.partial_messages.retain(|_, partial| { + partial.ttl -= 1; + partial.ttl != 0 + }); + } + } + + for subscription in self.subscriptions.values_mut() { + subscription.partial_messages.retain(|_, partial| { + partial.ttl -= 1; + partial.ttl != 0 + }); + } + } + + /// Called by the [`Behaviour`] when a partial message is received. + pub(crate) fn handle_received( + &mut self, + peer_id: PeerId, + message: PartialMessage, + ) -> ReceivedAction { + let Some(peer_subscriptions) = self.peer_subscriptions.get_mut(&peer_id) else { + tracing::error!(peer = %peer_id, + topic_hash = ?message.topic_hash, + "Partial message received from an unknown peer"); + return ReceivedAction::None; + }; + + // If the peer has sent us a partial message without a subscription message first, + // insert it on the list with the defaults, supports_partial = false. + let peer_partials = peer_subscriptions + .entry(message.topic_hash.clone()) + .or_default(); + + let peer_partial = peer_partials + .partial_messages + .entry(message.group_id.clone()) + .or_default(); + + // Check if the local partial data we have from the peer is oudated. + let metadata_updated = match (&mut peer_partial.metadata, &message.metadata) { + (None, Some(remote_metadata)) => { + peer_partial.metadata = Some(PeerMetadata::Remote(remote_metadata.clone())); + true + } + (Some(PeerMetadata::Remote(ref metadata)), Some(remote_metadata)) => { + if metadata != remote_metadata { + peer_partial.metadata = Some(PeerMetadata::Remote(remote_metadata.clone())); + true + } else { + false + } + } + (Some(PeerMetadata::Local(metadata)), Some(remote_metadata)) => { + match metadata.update(remote_metadata) { + Ok(updated) => updated, + Err(err) => { + tracing::debug!( + peer=%peer_id, + topic=%message.topic_hash, + group_id=?message.group_id, + err=%err, + "Error updating Partial metadata" + ); + return ReceivedAction::Publish(PublishAction::PenalizePeer { + peer_id, + topic_hash: message.topic_hash.clone(), + }); + } + } + } + (Some(_), None) | (None, None) => false, + }; + + if !metadata_updated && message.body.is_none() { + return ReceivedAction::None; + } + + // We may have already received other partials from this and other peers, + // but haven't responded to them yet, in those situations just return + // the partial to the application layer. + let Some(local_partial) = self + .subscriptions + .get_mut(&message.topic_hash) + .and_then(|t| t.partial_messages.get(&message.group_id)) + else { + return ReceivedAction::EmitEvent { + topic_hash: message.topic_hash, + peer_id, + group_id: message.group_id, + message: message.body, + metadata: message.metadata, + }; + }; + + let action = match local_partial + .partial + .partial_action_from_metadata(peer_id, message.metadata.as_deref()) + { + Ok(action) => action, + Err(err) => { + tracing::debug!(peer = %peer_id, group_id = ?message.group_id,err = %err, + "Could not reconstruct message bytes for peer metadata from a received partial"); + // Should we remove the partial from the peer? + peer_partials.partial_messages.remove(&message.group_id); + return ReceivedAction::Publish(PublishAction::PenalizePeer { + peer_id, + topic_hash: message.topic_hash.clone(), + }); + } + }; + + // check if we have new data for that peer. + let Some((body, peer_updated_metadata)) = action.send else { + return ReceivedAction::None; + }; + peer_partial.metadata = Some(PeerMetadata::Local(peer_updated_metadata)); + + let cached_metadata = local_partial.partial.metadata().as_slice().to_vec(); + ReceivedAction::Publish(PublishAction::SendMessage { + peer_id, + rpc: RpcOut::PartialMessage(PartialMessage { + body: Some(body), + metadata: Some(cached_metadata), + group_id: message.group_id.clone(), + topic_hash: message.topic_hash.clone(), + }), + }) + } + + /// Check if the peer requests partial messages for the topic. + pub(crate) fn requests_partial(&self, peer_id: &PeerId, topic_hash: &TopicHash) -> bool { + self.peer_subscriptions + .get(peer_id) + .and_then(|subscription| subscription.get(topic_hash)) + .and_then(|s| s.options) + .map(|options| options.requests_partial) + .unwrap_or(false) + } + + /// Check if the peer supports partial messages for the topic. + pub(crate) fn supports_partial(&self, peer_id: &PeerId, topic_hash: &TopicHash) -> bool { + self.peer_subscriptions + .get(peer_id) + .and_then(|subscriptions| subscriptions.get(topic_hash)) + .and_then(|s| s.options) + .map(|options| options.supports_partial) + .unwrap_or_default() + } + + /// Get our partial opts for a topic (used by Behaviour when sending Subscribe) + pub(crate) fn opts(&self, topic: &TopicHash) -> Option { + self.subscriptions + .get(topic) + .and_then(|subscription| subscription.options) + } + + /// Check if the peer has sent us message on the provided topic and group_id. + pub(crate) fn group_id( + &self, + peer_id: &PeerId, + topic_hash: &TopicHash, + group_id: &[u8], + ) -> bool { + self.peer_subscriptions + .get(peer_id) + .and_then(|peer| peer.get(topic_hash)) + .and_then(|subscription| subscription.partial_messages.get(group_id)) + .is_some() + } + + /// Determines the actions to take based on the provided recipients and partial. + /// Returns the actions for the Behaviour to execute. + pub(crate) fn handle_publish( + &mut self, + topic_hash: TopicHash, + partial_message: P, + recipients: HashSet, + ) -> Vec { + let mut actions = vec![]; + let group_id = partial_message.group_id(); + + let publish_metadata = partial_message.metadata(); + for peer_id in recipients { + let Some(subscription) = self + .peer_subscriptions + .get_mut(&peer_id) + .and_then(|subscriptions| subscriptions.get_mut(&topic_hash)) + else { + tracing::error!(peer = %peer_id, + "Could not get partial subscripion from peer which subscribed for partial messages"); + continue; + }; + + let group_partials = subscription + .partial_messages + .entry(group_id.clone()) + .or_default(); + + // Peer `supports_partial` but doesn't `requests_partial`. + // We assume peer requests_partial is true as that has already been filtered + // by the behavior. + if !subscription.options.unwrap_or_default().requests_partial { + actions.push(PublishAction::SendMessage { + peer_id, + rpc: RpcOut::PartialMessage(PartialMessage { + body: None, + metadata: Some(publish_metadata.clone()), + group_id: group_id.clone(), + topic_hash: topic_hash.clone(), + }), + }); + continue; + } + + let Ok(action) = partial_message.partial_action_from_metadata( + peer_id, + group_partials.metadata.as_ref().map(|p| p.as_ref()), + ) else { + tracing::error!(peer = %peer_id, group_id = ?group_id, + "Could not reconstruct message bytes for peer metadata"); + subscription.partial_messages.remove(&group_id); + actions.push(PublishAction::PenalizePeer { + peer_id, + topic_hash: topic_hash.clone(), + }); + continue; + }; + + // Check if we have new data for the peer. + let body = if let Some((body, peer_updated_metadata)) = action.send { + // We have something to send, update the peer's metadata. + group_partials.metadata = Some(PeerMetadata::Local(peer_updated_metadata)); + Some(body) + } else if group_partials.metadata.is_none() || action.need { + // We have no data to eagerly send, but we want to transmit our metadata anyway, to + // let the peer know of our metadata so that it sends us its data. + None + } else { + continue; + }; + + actions.push(PublishAction::SendMessage { + peer_id, + rpc: RpcOut::PartialMessage(PartialMessage { + group_id: group_id.clone(), + topic_hash: topic_hash.clone(), + body, + metadata: Some(publish_metadata.clone()), + }), + }); + } + + // Cache the sent partial + let topic_partials = self.subscriptions.entry(topic_hash).or_default(); + topic_partials.partial_messages.insert( + partial_message.group_id(), + LocalPartial { + partial: Box::new(partial_message), + ttl: DEFAULT_PARTIAL_TTL, + }, + ); + + actions + } +} + +/// Action returned by `State::partial_action`. +#[allow(clippy::large_enum_variant)] +pub(crate) enum PublishAction { + /// Send a partial message RPC to a peer + SendMessage { peer_id: PeerId, rpc: RpcOut }, + /// Penalize peer for invalid partial + PenalizePeer { + peer_id: PeerId, + topic_hash: TopicHash, + }, +} + +/// Action returned by `State::handle_received`. +pub(crate) enum ReceivedAction { + /// Do not do anything. + None, + /// Emit a Partial event to the application + EmitEvent { + topic_hash: TopicHash, + peer_id: PeerId, + group_id: Vec, + message: Option>, + metadata: Option>, + }, + Publish(PublishAction), +} + +/// A Partial message sent and received from remote peers. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PartialMessage { + /// The group ID that identifies the complete logical message. + pub group_id: Vec, + /// The topic ID this partial message belongs to. + pub topic_hash: TopicHash, + /// The partial message itself. + pub body: Option>, + /// The partial metadata we have and want. + pub metadata: Option>, +} + +/// Stored `Metadata` for a peer, +/// `Remote` or `Local` depends on who last updated it. +#[derive(Debug)] +pub(crate) enum PeerMetadata { + /// The metadata was updated with data from a remote peer. + Remote(Vec), + /// The metadata was updated by us when publishing a partial message. + Local(Box), +} + +impl AsRef<[u8]> for PeerMetadata { + fn as_ref(&self) -> &[u8] { + match self { + PeerMetadata::Remote(metadata) => metadata, + PeerMetadata::Local(metadata) => metadata.as_slice(), + } + } +} + +/// Partial options when subscribing a topic. +#[derive(Default)] +pub(crate) struct LocalSubscription { + /// Subscription options, None if we have not subscribe to the topic. + options: Option, + /// Partial messages we have sent us on the topic subscription. + partial_messages: HashMap, LocalPartial>, +} + +/// A topic subscribed by a remote peer. +#[derive(Debug, Default)] +pub(crate) struct RemoteSubscription { + /// Subscription options, None if peer has not subscribe to the topic. + options: Option, + /// Partial messages that the peer has sent us on the topic subscription. + partial_messages: HashMap, RemotePartial>, +} + +/// a local cached sent partial message. +struct LocalPartial { + partial: Box, + ttl: usize, +} + +/// A partial message data the peer has sent us. +#[derive(Debug)] +struct RemotePartial { + /// The current peer partial metadata. + metadata: Option, + /// The remaining heartbeats for this message to be deleted. + ttl: usize, +} + +impl Default for RemotePartial { + fn default() -> Self { + Self { + metadata: Default::default(), + ttl: DEFAULT_PARTIAL_TTL, + } + } +} + +/// Errors that can occur during partial message processing. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PartialError { + /// The received data is too short to contain required headers/metadata. + InsufficientData { + /// Expected minimum number of bytes. + expected: usize, + /// Actual number of bytes received. + received: usize, + }, + + /// The data format is invalid or corrupted. + InvalidFormat, + + /// The partial data doesn't belong to this message group. + WrongGroup { + /// Group Id of the received message. + received: Vec, + }, + + /// The partial data is a duplicate of already received data. + DuplicateData(Vec), + + /// The partial data is out of the expected range or sequence. + OutOfRange, + + /// The message is already complete and cannot accept more data. + AlreadyComplete, + + /// Application-specific validation failed. + ValidationFailed, +} + +impl std::error::Error for PartialError {} + +impl std::fmt::Display for PartialError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InsufficientData { expected, received } => { + write!( + f, + "Insufficient data: expected at least {} bytes, got {}", + expected, received + ) + } + Self::InvalidFormat => { + write!(f, "Invalid data format") + } + Self::WrongGroup { received } => { + write!(f, "Wrong group ID: got {:?}", received) + } + Self::DuplicateData(part_id) => { + write!(f, "Duplicate data for part {:?}", part_id) + } + Self::OutOfRange => { + write!(f, "Data out of range") + } + Self::AlreadyComplete => { + write!(f, "Message is already complete") + } + Self::ValidationFailed => { + write!(f, "Validation failed") + } + } + } +} diff --git a/protocols/gossipsub/src/generated/gossipsub/pb.rs b/protocols/gossipsub/src/generated/gossipsub/pb.rs index 24ac80d2755..aaaa96680d9 100644 --- a/protocols/gossipsub/src/generated/gossipsub/pb.rs +++ b/protocols/gossipsub/src/generated/gossipsub/pb.rs @@ -19,6 +19,8 @@ pub struct RPC { pub subscriptions: Vec, pub publish: Vec, pub control: Option, + pub partial: Option, + pub testExtension: Option, } impl<'a> MessageRead<'a> for RPC { @@ -29,6 +31,8 @@ impl<'a> MessageRead<'a> for RPC { Ok(10) => msg.subscriptions.push(r.read_message::(bytes)?), Ok(18) => msg.publish.push(r.read_message::(bytes)?), Ok(26) => msg.control = Some(r.read_message::(bytes)?), + Ok(82) => msg.partial = Some(r.read_message::(bytes)?), + Ok(51939474) => msg.testExtension = Some(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -43,12 +47,16 @@ impl MessageWrite for RPC { + self.subscriptions.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.publish.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.control.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.partial.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_len((m).get_size())) } fn write_message(&self, w: &mut Writer) -> Result<()> { for s in &self.subscriptions { w.write_with_tag(10, |w| w.write_message(s))?; } for s in &self.publish { w.write_with_tag(18, |w| w.write_message(s))?; } if let Some(ref s) = self.control { w.write_with_tag(26, |w| w.write_message(s))?; } + if let Some(ref s) = self.partial { w.write_with_tag(82, |w| w.write_message(s))?; } + if let Some(ref s) = self.testExtension { w.write_with_tag(51939474, |w| w.write_message(s))?; } Ok(()) } } @@ -62,6 +70,8 @@ use super::*; pub struct SubOpts { pub subscribe: Option, pub topic_id: Option, + pub requestsPartial: Option, + pub supportsPartial: Option, } impl<'a> MessageRead<'a> for SubOpts { @@ -71,6 +81,8 @@ impl<'a> MessageRead<'a> for SubOpts { match r.next_tag(bytes) { Ok(8) => msg.subscribe = Some(r.read_bool(bytes)?), Ok(18) => msg.topic_id = Some(r.read_string(bytes)?.to_owned()), + Ok(24) => msg.requestsPartial = Some(r.read_bool(bytes)?), + Ok(32) => msg.supportsPartial = Some(r.read_bool(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -84,11 +96,15 @@ impl MessageWrite for SubOpts { 0 + self.subscribe.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + self.topic_id.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.requestsPartial.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.supportsPartial.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) } fn write_message(&self, w: &mut Writer) -> Result<()> { if let Some(ref s) = self.subscribe { w.write_with_tag(8, |w| w.write_bool(*s))?; } if let Some(ref s) = self.topic_id { w.write_with_tag(18, |w| w.write_string(&**s))?; } + if let Some(ref s) = self.requestsPartial { w.write_with_tag(24, |w| w.write_bool(*s))?; } + if let Some(ref s) = self.supportsPartial { w.write_with_tag(32, |w| w.write_bool(*s))?; } Ok(()) } } @@ -155,6 +171,7 @@ pub struct ControlMessage { pub graft: Vec, pub prune: Vec, pub idontwant: Vec, + pub extensions: Option, } impl<'a> MessageRead<'a> for ControlMessage { @@ -167,6 +184,7 @@ impl<'a> MessageRead<'a> for ControlMessage { Ok(26) => msg.graft.push(r.read_message::(bytes)?), Ok(34) => msg.prune.push(r.read_message::(bytes)?), Ok(42) => msg.idontwant.push(r.read_message::(bytes)?), + Ok(50) => msg.extensions = Some(r.read_message::(bytes)?), Ok(t) => { r.read_unknown(bytes, t)?; } Err(e) => return Err(e), } @@ -183,6 +201,7 @@ impl MessageWrite for ControlMessage { + self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + self.idontwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + self.extensions.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) } fn write_message(&self, w: &mut Writer) -> Result<()> { @@ -191,6 +210,7 @@ impl MessageWrite for ControlMessage { for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; } for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; } for s in &self.idontwant { w.write_with_tag(42, |w| w.write_message(s))?; } + if let Some(ref s) = self.extensions { w.write_with_tag(50, |w| w.write_message(s))?; } Ok(()) } } @@ -367,6 +387,55 @@ impl MessageWrite for ControlIDontWant { } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct ControlExtensions { + pub partialMessages: Option, + pub testExtension: Option, +} + +impl<'a> MessageRead<'a> for ControlExtensions { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(80) => msg.partialMessages = Some(r.read_bool(bytes)?), + Ok(51939472) => msg.testExtension = Some(r.read_bool(bytes)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for ControlExtensions { + fn get_size(&self) -> usize { + 0 + + self.partialMessages.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64)) + + self.testExtension.as_ref().map_or(0, |m| 4 + sizeof_varint(*(m) as u64)) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.partialMessages { w.write_with_tag(80, |w| w.write_bool(*s))?; } + if let Some(ref s) = self.testExtension { w.write_with_tag(51939472, |w| w.write_bool(*s))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct TestExtension { } + +impl<'a> MessageRead<'a> for TestExtension { + fn from_reader(r: &mut BytesReader, _: &[u8]) -> Result { + r.read_to_end(); + Ok(Self::default()) + } +} + +impl MessageWrite for TestExtension { } + #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Default, PartialEq, Clone)] pub struct PeerInfo { @@ -601,3 +670,47 @@ impl<'a> From<&'a str> for EncMode { } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct PartialMessagesExtension { + pub topicID: Option>, + pub groupID: Option>, + pub partialMessage: Option>, + pub partsMetadata: Option>, +} + +impl<'a> MessageRead<'a> for PartialMessagesExtension { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.topicID = Some(r.read_bytes(bytes)?.to_owned()), + Ok(18) => msg.groupID = Some(r.read_bytes(bytes)?.to_owned()), + Ok(26) => msg.partialMessage = Some(r.read_bytes(bytes)?.to_owned()), + Ok(34) => msg.partsMetadata = Some(r.read_bytes(bytes)?.to_owned()), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for PartialMessagesExtension { + fn get_size(&self) -> usize { + 0 + + self.topicID.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.groupID.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.partialMessage.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + + self.partsMetadata.as_ref().map_or(0, |m| 1 + sizeof_len((m).len())) + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.topicID { w.write_with_tag(10, |w| w.write_bytes(&**s))?; } + if let Some(ref s) = self.groupID { w.write_with_tag(18, |w| w.write_bytes(&**s))?; } + if let Some(ref s) = self.partialMessage { w.write_with_tag(26, |w| w.write_bytes(&**s))?; } + if let Some(ref s) = self.partsMetadata { w.write_with_tag(34, |w| w.write_bytes(&**s))?; } + Ok(()) + } +} + diff --git a/protocols/gossipsub/src/generated/rpc.proto b/protocols/gossipsub/src/generated/rpc.proto index fe4d3bc9366..682770eb933 100644 --- a/protocols/gossipsub/src/generated/rpc.proto +++ b/protocols/gossipsub/src/generated/rpc.proto @@ -9,9 +9,24 @@ message RPC { message SubOpts { optional bool subscribe = 1; // subscribe or unsubscribe optional string topic_id = 2; + // Used with Partial Messages extension. + // If set to true, signals to the receiver that the sender prefers partial + // messages. + optional bool requestsPartial = 3; + // If set to true, signals to the receiver that the sender supports sending + // partial messages on this topic. If requestsPartial is true, this is + // assumed to be true. + optional bool supportsPartial = 4; } optional ControlMessage control = 3; + // Canonical Extensions should register their messages here. + optional PartialMessagesExtension partial = 10; + + // Experimental Extensions should register their messages here. They + // must use field numbers larger than 0x200000 to be encoded with at least 4 + // bytes + optional TestExtension testExtension = 6492434; } message Message { @@ -29,6 +44,7 @@ message ControlMessage { repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; repeated ControlIDontWant idontwant = 5; + optional ControlExtensions extensions = 6; } message ControlIHave { @@ -51,9 +67,19 @@ message ControlPrune { } message ControlIDontWant { - repeated bytes message_ids = 1; + repeated bytes message_ids = 1; } +message ControlExtensions { + optional bool partialMessages = 10; + + // Experimental extensions must use field numbers larger than 0x200000 to be + // encoded with at least 4 bytes + optional bool testExtension = 6492434; +} + +message TestExtension {} + message PeerInfo { optional bytes peer_id = 1; optional bytes signed_peer_record = 2; @@ -87,3 +113,14 @@ message TopicDescriptor { } } } + +message PartialMessagesExtension { + optional bytes topicID = 1; + optional bytes groupID = 2; + + // An encoded partial message + optional bytes partialMessage = 3; + + // An encoded representation of the parts a peer has and wants. + optional bytes partsMetadata = 4; +} diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index f1d42d6cddb..27c5a8ad9db 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -98,6 +98,7 @@ mod backoff; mod behaviour; mod config; mod error; +pub mod extensions; mod gossip_promises; mod handler; mod mcache; @@ -113,6 +114,9 @@ mod topic; mod transform; mod types; +#[cfg(feature = "partial_messages")] +pub use self::extensions::partial_messages; + #[cfg(feature = "metrics")] pub use metrics::Config as MetricsConfig; diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 1394d9a92a7..f9367eeeec7 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -137,7 +137,7 @@ pub(crate) struct Metrics { // Metrics regarding mesh state /// Number of peers in our mesh. This metric should be updated with the count of peers for a /// topic in the mesh regardless of inclusion and churn events. - mesh_peer_counts: Family, + mesh_peer_counts: Family, /// Number of times we include peers in a topic mesh for different reasons. mesh_peer_inclusion_events: Family, /// Number of times we remove peers in a topic mesh for different reasons. @@ -147,7 +147,7 @@ pub(crate) struct Metrics { /// Number of gossip messages sent to each topic. topic_msg_sent_counts: Family, /// Bytes from gossip messages sent to each topic. - topic_msg_sent_bytes: Family, + topic_msg_sent_bytes: Family, /// Number of gossipsub messages published to each topic. topic_msg_published: Family, @@ -451,24 +451,52 @@ impl Metrics { /// Registers the subscription to a topic if the configured limits allow it. /// Sets the registered number of peers in the mesh to 0. - pub(crate) fn joined(&mut self, topic: &TopicHash) { - if self.topic_info.contains_key(topic) || self.topic_info.len() < self.max_topics { - self.topic_info.insert(topic.clone(), true); - let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(1); + pub(crate) fn joined(&mut self, topic_hash: &TopicHash) { + if self.topic_info.contains_key(topic_hash) || self.topic_info.len() < self.max_topics { + self.topic_info.insert(topic_hash.clone(), true); + let was_subscribed = self + .topic_subscription_status + .get_or_create(topic_hash) + .set(1); debug_assert_eq!(was_subscribed, 0); - self.mesh_peer_counts.get_or_create(topic).set(0); + self.mesh_peer_counts + .get_or_create(&MeshPeerLabel { + supports_partial: true, + topic_hash: topic_hash.to_string(), + }) + .set(0); + self.mesh_peer_counts + .get_or_create(&MeshPeerLabel { + supports_partial: false, + topic_hash: topic_hash.to_string(), + }) + .set(0); } } /// Registers the unsubscription to a topic if the topic was previously allowed. /// Sets the registered number of peers in the mesh to 0. - pub(crate) fn left(&mut self, topic: &TopicHash) { - if self.topic_info.contains_key(topic) { + pub(crate) fn left(&mut self, topic_hash: &TopicHash) { + if self.topic_info.contains_key(topic_hash) { // Depending on the configured topic bounds we could miss a mesh topic. // So, check first if the topic was previously allowed. - let was_subscribed = self.topic_subscription_status.get_or_create(topic).set(0); + let was_subscribed = self + .topic_subscription_status + .get_or_create(topic_hash) + .set(0); debug_assert_eq!(was_subscribed, 1); - self.mesh_peer_counts.get_or_create(topic).set(0); + self.mesh_peer_counts + .get_or_create(&MeshPeerLabel { + supports_partial: true, + topic_hash: topic_hash.to_string(), + }) + .set(0); + self.mesh_peer_counts + .get_or_create(&MeshPeerLabel { + supports_partial: false, + topic_hash: topic_hash.to_string(), + }) + .set(0); } } @@ -497,10 +525,20 @@ impl Metrics { } /// Register the current number of peers in our mesh for this topic. - pub(crate) fn set_mesh_peers(&mut self, topic: &TopicHash, count: usize) { - if self.register_topic(topic).is_ok() { + pub(crate) fn set_mesh_peers( + &mut self, + topic_hash: &TopicHash, + count: usize, + supports_partial: bool, + ) { + if self.register_topic(topic_hash).is_ok() { // Due to limits, this topic could have not been allowed, so we check. - self.mesh_peer_counts.get_or_create(topic).set(count as i64); + self.mesh_peer_counts + .get_or_create(&MeshPeerLabel { + supports_partial, + topic_hash: topic_hash.to_string(), + }) + .set(count as i64); } } @@ -526,11 +564,14 @@ impl Metrics { } /// Register sending a message over a topic. - pub(crate) fn msg_sent(&mut self, topic: &TopicHash, bytes: usize) { - if self.register_topic(topic).is_ok() { - self.topic_msg_sent_counts.get_or_create(topic).inc(); + pub(crate) fn msg_sent(&mut self, topic_hash: &TopicHash, partial: bool, bytes: usize) { + if self.register_topic(topic_hash).is_ok() { + self.topic_msg_sent_counts.get_or_create(topic_hash).inc(); self.topic_msg_sent_bytes - .get_or_create(topic) + .get_or_create(&RpcSentLabel { + partial, + topic_hash: topic_hash.to_string(), + }) .inc_by(bytes as u64); } } @@ -753,6 +794,21 @@ struct HistBuilder { buckets: Vec, } +/// Label for the total size of publish messages sent via RPC. +#[derive(PartialEq, Eq, Hash, EncodeLabelSet, Clone, Debug)] +struct RpcSentLabel { + partial: bool, + topic_hash: String, +} + +/// Label for the mesh peers. +/// Label for the total size of publish messages sent via RPC. +#[derive(PartialEq, Eq, Hash, EncodeLabelSet, Clone, Debug)] +struct MeshPeerLabel { + supports_partial: bool, + topic_hash: String, +} + impl MetricConstructor for HistBuilder { fn new_metric(&self) -> Histogram { Histogram::new(self.buckets.clone()) diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 7a30038c48b..06755e366b2 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -532,6 +532,24 @@ impl PeerScore { } } + /// Indicate that a peer has sent us invalid partial message data. + #[cfg(feature = "partial_messages")] + pub(crate) fn reject_invalid_partial(&mut self, peer_id: PeerId, topic_hash: &TopicHash) { + if let Some(peer_stats) = self.peer_stats.get_mut(&peer_id) { + if let Some(topic_stats) = + peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) + { + tracing::debug!( + peer=%peer_id, + topic=%topic_hash, + "[Penalty] Peer delivered invalid partial data in topic and gets penalized \ + for it", + ); + topic_stats.invalid_message_deliveries += 1f64; + } + } + } + /// Removes an ip from a peer pub(crate) fn remove_ip(&mut self, peer_id: &PeerId, ip: &IpAddr) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 74dcc669f55..dd5f0067ffd 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -29,20 +29,28 @@ use libp2p_identity::{PeerId, PublicKey}; use libp2p_swarm::StreamProtocol; use quick_protobuf::{MessageWrite, Writer}; +#[cfg(feature = "partial_messages")] +use crate::extensions::partial_messages::PartialMessage; use crate::{ config::ValidationMode, handler::HandlerEvent, rpc_proto::proto, topic::TopicHash, types::{ - ControlAction, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, - RawMessage, RpcIn, Subscription, SubscriptionAction, + ControlAction, Extensions, Graft, IDontWant, IHave, IWant, MessageId, PeerInfo, PeerKind, + Prune, RawMessage, RpcIn, Subscription, SubscriptionAction, SubscriptionOpts, + TestExtension, }, ValidationError, }; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; +pub(crate) const GOSSIPSUB_1_3_0_PROTOCOL: ProtocolId = ProtocolId { + protocol: StreamProtocol::new("/meshsub/1.3.0"), + kind: PeerKind::Gossipsubv1_2, +}; + pub(crate) const GOSSIPSUB_1_2_0_PROTOCOL: ProtocolId = ProtocolId { protocol: StreamProtocol::new("/meshsub/1.2.0"), kind: PeerKind::Gossipsubv1_2, @@ -79,6 +87,7 @@ impl Default for ProtocolConfig { Self { validation_mode: ValidationMode::Strict, protocol_ids: vec![ + GOSSIPSUB_1_3_0_PROTOCOL, GOSSIPSUB_1_2_0_PROTOCOL, GOSSIPSUB_1_1_0_PROTOCOL, GOSSIPSUB_1_0_0_PROTOCOL, @@ -556,13 +565,40 @@ impl Decoder for GossipsubCodec { }) .collect(); + let extensions_msg = rpc_control.extensions.map(|extensions| Extensions { + test_extension: extensions.testExtension, + partial_messages: extensions.partialMessages, + }); + control_msgs.extend(ihave_msgs); control_msgs.extend(iwant_msgs); control_msgs.extend(graft_msgs); control_msgs.extend(prune_msgs); control_msgs.extend(idontwant_msgs); + control_msgs.push(ControlAction::Extensions(extensions_msg)); } + #[cfg(feature = "partial_messages")] + let partial_message = rpc.partial.and_then(|partial_proto| { + let Some(topic_id_bytes) = partial_proto.topicID else { + tracing::debug!("Partial message without topic_id, discarding"); + return None; + }; + let topic_hash = TopicHash::from_raw(String::from_utf8_lossy(&topic_id_bytes)); + + let Some(group_id) = partial_proto.groupID else { + tracing::debug!("Partial message without group_id, discarding"); + return None; + }; + + Some(PartialMessage { + topic_hash, + group_id, + metadata: partial_proto.partsMetadata, + body: partial_proto.partialMessage, + }) + }); + Ok(Some(HandlerEvent::Message { rpc: RpcIn { messages, @@ -576,9 +612,16 @@ impl Decoder for GossipsubCodec { SubscriptionAction::Unsubscribe }, topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()), + options: SubscriptionOpts { + requests_partial: sub.requestsPartial.unwrap_or_default(), + supports_partial: sub.supportsPartial.unwrap_or_default(), + }, }) .collect(), control_msgs, + test_extension: rpc.testExtension.map(|_test_extension| TestExtension {}), + #[cfg(feature = "partial_messages")] + partial_message, }, invalid_messages, })) diff --git a/protocols/gossipsub/src/queue.rs b/protocols/gossipsub/src/queue.rs index 971ae801f83..ec39c9bf545 100644 --- a/protocols/gossipsub/src/queue.rs +++ b/protocols/gossipsub/src/queue.rs @@ -62,7 +62,7 @@ impl Queue { /// which will only happen for control and non priority messages. pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box> { match message { - RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) => { + RpcOut::Extensions(_) | RpcOut::Subscribe { .. } | RpcOut::Unsubscribe(_) => { self.priority .try_push(message) .expect("Shared is unbounded"); @@ -74,7 +74,10 @@ impl Queue { RpcOut::Publish { .. } | RpcOut::Forward { .. } | RpcOut::IHave(_) + | RpcOut::TestExtension | RpcOut::IWant(_) => self.non_priority.try_push(message), + #[cfg(feature = "partial_messages")] + RpcOut::PartialMessage(_) => self.non_priority.try_push(message), } } diff --git a/protocols/gossipsub/src/rpc.rs b/protocols/gossipsub/src/rpc.rs index 943df31f599..bc2e82d40f6 100644 --- a/protocols/gossipsub/src/rpc.rs +++ b/protocols/gossipsub/src/rpc.rs @@ -87,11 +87,15 @@ impl Sender { RpcOut::Publish { .. } | RpcOut::Graft(_) | RpcOut::Prune(_) + | RpcOut::Extensions(_) | RpcOut::Subscribe(_) | RpcOut::Unsubscribe(_) => &self.priority_sender, - RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => { - &self.non_priority_sender - } + RpcOut::Forward { .. } + | RpcOut::IHave(_) + | RpcOut::IWant(_) + | RpcOut::IDontWant(_) + | RpcOut::TestExtension + | RpcOut::PartialMessage { .. } => &self.non_priority_sender, }; sender.try_send(rpc).map_err(|err| err.into_inner()) } diff --git a/protocols/gossipsub/src/subscription_filter.rs b/protocols/gossipsub/src/subscription_filter.rs index c051b6c333b..cdb8df3d1f2 100644 --- a/protocols/gossipsub/src/subscription_filter.rs +++ b/protocols/gossipsub/src/subscription_filter.rs @@ -225,22 +225,27 @@ mod test { Subscription { action: Unsubscribe, topic_hash: t1.clone(), + options: Default::default(), }, Subscription { action: Unsubscribe, topic_hash: t2.clone(), + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t2, + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t1.clone(), + options: Default::default(), }, Subscription { action: Unsubscribe, topic_hash: t1, + options: Default::default(), }, ]; @@ -262,10 +267,12 @@ mod test { Subscription { action: Subscribe, topic_hash: t1, + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t2, + options: Default::default(), }, ]; @@ -291,14 +298,17 @@ mod test { Subscription { action: Subscribe, topic_hash: t1.clone(), + options: Default::default(), }, Subscription { action: Unsubscribe, topic_hash: t1.clone(), + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t1, + options: Default::default(), }, ]; @@ -324,10 +334,12 @@ mod test { Subscription { action: Subscribe, topic_hash: t[2].clone(), + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t[3].clone(), + options: Default::default(), }, ]; @@ -353,22 +365,27 @@ mod test { Subscription { action: Subscribe, topic_hash: t[4].clone(), + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t[2].clone(), + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t[3].clone(), + options: Default::default(), }, Subscription { action: Unsubscribe, topic_hash: t[0].clone(), + options: Default::default(), }, Subscription { action: Unsubscribe, topic_hash: t[1].clone(), + options: Default::default(), }, ]; @@ -390,10 +407,12 @@ mod test { Subscription { action: Subscribe, topic_hash: t1, + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t2, + options: Default::default(), }, ]; @@ -416,14 +435,17 @@ mod test { Subscription { action: Subscribe, topic_hash: t1, + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t2, + options: Default::default(), }, Subscription { action: Subscribe, topic_hash: t3, + options: Default::default(), }, ]; diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index 53e9fe2c172..adbdf58637f 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -65,7 +65,7 @@ impl Hasher for Sha256Hash { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] #[cfg_attr( feature = "metrics", derive(prometheus_client::encoding::EncodeLabelSet) diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index bea0786e060..10291249bd8 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -91,6 +91,8 @@ impl std::fmt::Debug for MessageId { pub(crate) struct PeerDetails { /// The kind of protocol the peer supports. pub(crate) kind: PeerKind, + /// The Extensions supported by the peer if any. + pub(crate) extensions: Option, /// If the peer is an outbound connection. pub(crate) outbound: bool, /// Its current connections. @@ -110,6 +112,8 @@ pub(crate) struct PeerDetails { derive(prometheus_client::encoding::EncodeLabelValue) )] pub enum PeerKind { + /// A gossipsub 1.3 peer. + Gossipsubv1_3, /// A gossipsub 1.2 peer. Gossipsubv1_2, /// A gossipsub 1.1 peer. @@ -223,6 +227,8 @@ pub struct Subscription { pub action: SubscriptionAction, /// The topic from which to subscribe or unsubscribe. pub topic_hash: TopicHash, + /// Partial options. + pub options: SubscriptionOpts, } /// Action that a subscription wants to perform. @@ -234,6 +240,13 @@ pub enum SubscriptionAction { Unsubscribe, } +/// Partial options when subscribing a topic. +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash)] +pub struct SubscriptionOpts { + pub(crate) requests_partial: bool, + pub(crate) supports_partial: bool, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub(crate) struct PeerInfo { pub(crate) peer_id: Option, @@ -257,6 +270,8 @@ pub enum ControlAction { /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant /// control message. IDontWant(IDontWant), + /// The Node has sent us its supported extensions. + Extensions(Option), } /// Node broadcasts known messages per topic - IHave control message. @@ -300,6 +315,16 @@ pub struct IDontWant { pub(crate) message_ids: Vec, } +/// The node has sent us the supported Gossipsub Extensions. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct Extensions { + pub(crate) test_extension: Option, + pub(crate) partial_messages: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TestExtension {} + /// A Gossipsub RPC message sent. #[derive(Debug)] pub enum RpcOut { @@ -318,7 +343,11 @@ pub enum RpcOut { timeout: Delay, }, /// Subscribe a topic. - Subscribe(TopicHash), + Subscribe { + topic: TopicHash, + requests_partial: bool, + supports_partial: bool, + }, /// Unsubscribe a topic. Unsubscribe(TopicHash), /// Send a GRAFT control message. @@ -332,6 +361,13 @@ pub enum RpcOut { /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant /// control message. IDontWant(IDontWant), + /// Send a Extensions control message. + Extensions(Extensions), + /// Send a test extension message. + TestExtension, + /// Send a partial messages extension. + #[cfg(feature = "partial_messages")] + PartialMessage(crate::partial_messages::PartialMessage), } impl RpcOut { @@ -345,7 +381,7 @@ impl RpcOut { pub(crate) fn priority(&self) -> bool { matches!( self, - RpcOut::Subscribe(_) + RpcOut::Subscribe { .. } | RpcOut::Unsubscribe(_) | RpcOut::Graft(_) | RpcOut::Prune(_) @@ -362,27 +398,43 @@ impl From for proto::RPC { subscriptions: Vec::new(), publish: vec![message.into()], control: None, + testExtension: None, + partial: None, }, RpcOut::Forward { message, .. } => proto::RPC { publish: vec![message.into()], subscriptions: Vec::new(), control: None, + testExtension: None, + partial: None, }, - RpcOut::Subscribe(topic) => proto::RPC { + RpcOut::Subscribe { + topic, + requests_partial, + supports_partial, + } => proto::RPC { publish: Vec::new(), subscriptions: vec![proto::SubOpts { subscribe: Some(true), topic_id: Some(topic.into_string()), + requestsPartial: Some(requests_partial), + supportsPartial: Some(supports_partial), }], control: None, + testExtension: None, + partial: None, }, RpcOut::Unsubscribe(topic) => proto::RPC { publish: Vec::new(), subscriptions: vec![proto::SubOpts { subscribe: Some(false), topic_id: Some(topic.into_string()), + requestsPartial: None, + supportsPartial: None, }], control: None, + testExtension: None, + partial: None, }, RpcOut::IHave(IHave { topic_hash, @@ -399,7 +451,10 @@ impl From for proto::RPC { graft: vec![], prune: vec![], idontwant: vec![], + extensions: None, }), + testExtension: None, + partial: None, }, RpcOut::IWant(IWant { message_ids }) => proto::RPC { publish: Vec::new(), @@ -412,7 +467,10 @@ impl From for proto::RPC { graft: vec![], prune: vec![], idontwant: vec![], + extensions: None, }), + testExtension: None, + partial: None, }, RpcOut::Graft(Graft { topic_hash }) => proto::RPC { publish: Vec::new(), @@ -425,7 +483,10 @@ impl From for proto::RPC { }], prune: vec![], idontwant: vec![], + extensions: None, }), + testExtension: None, + partial: None, }, RpcOut::Prune(Prune { topic_hash, @@ -452,7 +513,10 @@ impl From for proto::RPC { backoff, }], idontwant: vec![], + extensions: None, }), + testExtension: None, + partial: None, } } RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC { @@ -466,6 +530,54 @@ impl From for proto::RPC { idontwant: vec![proto::ControlIDontWant { message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }], + extensions: None, + }), + testExtension: None, + partial: None, + }, + RpcOut::Extensions(Extensions { + partial_messages, + test_extension, + }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![], + extensions: Some(proto::ControlExtensions { + testExtension: test_extension, + partialMessages: partial_messages, + }), + }), + testExtension: None, + partial: None, + }, + RpcOut::TestExtension => proto::RPC { + subscriptions: vec![], + publish: vec![], + control: None, + testExtension: Some(proto::TestExtension {}), + partial: None, + }, + #[cfg(feature = "partial_messages")] + RpcOut::PartialMessage(crate::partial_messages::PartialMessage { + topic_hash, + group_id, + metadata, + body, + }) => proto::RPC { + subscriptions: vec![], + publish: vec![], + control: None, + testExtension: None, + partial: Some(proto::PartialMessagesExtension { + topicID: Some(topic_hash.as_str().as_bytes().to_vec()), + groupID: Some(group_id), + partialMessage: body, + partsMetadata: metadata, }), }, } @@ -481,6 +593,11 @@ pub struct RpcIn { pub subscriptions: Vec, /// List of Gossipsub control messages. pub control_msgs: Vec, + /// Gossipsub test extension. + pub test_extension: Option, + /// Partial messages extension. + #[cfg(feature = "partial_messages")] + pub partial_message: Option, } impl fmt::Debug for RpcIn { @@ -495,6 +612,9 @@ impl fmt::Debug for RpcIn { if !self.control_msgs.is_empty() { b.field("control_msgs", &self.control_msgs); } + #[cfg(feature = "partial_messages")] + b.field("partial_messages", &self.partial_message); + b.finish() } } @@ -507,6 +627,7 @@ impl PeerKind { Self::Gossipsub => "Gossipsub v1.0", Self::Gossipsubv1_1 => "Gossipsub v1.1", Self::Gossipsubv1_2 => "Gossipsub v1.2", + Self::Gossipsubv1_3 => "Gossipsub v1.3", } } } diff --git a/types.rs b/types.rs new file mode 100644 index 00000000000..5f7c0618c33 --- /dev/null +++ b/types.rs @@ -0,0 +1,710 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A collection of types using the Gossipsub system. +#[cfg(feature = "partial_messages")] +use std::collections::HashMap; +use std::{ + collections::BTreeSet, + fmt::{self, Debug}, +}; + +use futures_timer::Delay; +use hashlink::LinkedHashMap; +use libp2p_identity::PeerId; +use libp2p_swarm::ConnectionId; +use quick_protobuf::MessageWrite; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use web_time::Instant; + +use crate::{queue::Queue, rpc_proto::proto, TopicHash}; + +/// Messages that have expired while attempting to be sent to a peer. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct FailedMessages { + /// The number of messages that were failed to be sent to the priority queue as it was + /// full. + pub priority: usize, + /// The number of messages that were failed to be sent to the non priority queue as it was + /// full. + pub non_priority: usize, +} + +#[derive(Debug)] +/// Validation kinds from the application for received messages. +pub enum MessageAcceptance { + /// The message is considered valid, and it should be delivered and forwarded to the network. + Accept, + /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty. + Reject, + /// The message is neither delivered nor forwarded to the network, but the router does not + /// trigger the P₄ penalty. + Ignore, +} + +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct MessageId(pub Vec); + +impl MessageId { + pub fn new(value: &[u8]) -> Self { + Self(value.to_vec()) + } +} + +impl>> From for MessageId { + fn from(value: T) -> Self { + Self(value.into()) + } +} + +impl std::fmt::Display for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex_fmt::HexFmt(&self.0)) + } +} + +impl std::fmt::Debug for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0)) + } +} + +#[derive(Debug)] +/// Connected peer details. +pub(crate) struct PeerDetails { + /// The kind of protocol the peer supports. + pub(crate) kind: PeerKind, + /// The Extensions supported by the peer if any. + pub(crate) extensions: Option, + /// If the peer is an outbound connection. + pub(crate) outbound: bool, + /// Its current connections. + pub(crate) connections: Vec, + /// Subscribed topics. + pub(crate) topics: BTreeSet, + /// Don't send messages. + pub(crate) dont_send: LinkedHashMap, + + /// Message queue consumed by the connection handler. + pub(crate) messages: Queue, + + /// Peer Partial messages. + #[cfg(feature = "partial_messages")] + pub(crate) partial_messages: HashMap, PartialData>>, + + /// Partial only subscribed topics. + #[cfg(feature = "partial_messages")] + pub(crate) partial_only_topics: BTreeSet, +} + +/// Stored `Metadata` for a peer. +#[cfg(feature = "partial_messages")] +#[derive(Debug)] +pub(crate) enum PeerMetadata { + Remote(Vec), + Local(Box), +} + +#[cfg(feature = "partial_messages")] +impl AsRef<[u8]> for PeerMetadata { + fn as_ref(&self) -> &[u8] { + match self { + PeerMetadata::Remote(metadata) => metadata, + PeerMetadata::Local(metadata) => metadata.as_slice(), + } + } +} + +/// The partial message data the peer has. +#[cfg(feature = "partial_messages")] +#[derive(Debug)] +pub(crate) struct PartialData { + /// The current peer partial metadata. + pub(crate) metadata: Option, + /// The remaining heartbeats for this message to be deleted. + pub(crate) ttl: usize, +} + +#[cfg(feature = "partial_messages")] +impl Default for PartialData { + fn default() -> Self { + Self { + metadata: Default::default(), + ttl: 5, + } + } +} + +/// Describes the types of peers that can exist in the gossipsub context. +#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)] +#[cfg_attr( + feature = "metrics", + derive(prometheus_client::encoding::EncodeLabelValue) +)] +pub enum PeerKind { + /// A gossipsub 1.3 peer. + Gossipsubv1_3, + /// A gossipsub 1.2 peer. + Gossipsubv1_2, + /// A gossipsub 1.1 peer. + Gossipsubv1_1, + /// A gossipsub 1.0 peer. + Gossipsub, + /// A floodsub peer. + Floodsub, + /// The peer doesn't support any of the protocols. + NotSupported, +} + +/// A message received by the gossipsub system and stored locally in caches.. +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct RawMessage { + /// Id of the peer that published this message. + pub source: Option, + + /// Content of the message. Its meaning is out of scope of this library. + pub data: Vec, + + /// A random sequence number. + pub sequence_number: Option, + + /// The topic this message belongs to + pub topic: TopicHash, + + /// The signature of the message if it's signed. + pub signature: Option>, + + /// The public key of the message if it is signed and the source [`PeerId`] cannot be inlined. + pub key: Option>, + + /// Flag indicating if this message has been validated by the application or not. + pub validated: bool, +} + +impl PeerKind { + /// Returns true if peer speaks any gossipsub version. + pub(crate) fn is_gossipsub(&self) -> bool { + matches!( + self, + Self::Gossipsubv1_2 | Self::Gossipsubv1_1 | Self::Gossipsub + ) + } +} + +impl RawMessage { + /// Calculates the encoded length of this message (used for calculating metrics). + pub fn raw_protobuf_len(&self) -> usize { + let message = proto::Message { + from: self.source.map(|m| m.to_bytes()), + data: Some(self.data.clone()), + seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()), + topic: TopicHash::into_string(self.topic.clone()), + signature: self.signature.clone(), + key: self.key.clone(), + }; + message.get_size() + } +} + +impl From for proto::Message { + fn from(raw: RawMessage) -> Self { + proto::Message { + from: raw.source.map(|m| m.to_bytes()), + data: Some(raw.data), + seqno: raw.sequence_number.map(|s| s.to_be_bytes().to_vec()), + topic: TopicHash::into_string(raw.topic), + signature: raw.signature, + key: raw.key, + } + } +} + +/// The message sent to the user after a [`RawMessage`] has been transformed by a +/// [`crate::DataTransform`]. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Message { + /// Id of the peer that published this message. + pub source: Option, + + /// Content of the message. + pub data: Vec, + + /// A random sequence number. + pub sequence_number: Option, + + /// The topic this message belongs to + pub topic: TopicHash, +} + +impl fmt::Debug for Message { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Message") + .field( + "data", + &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)), + ) + .field("source", &self.source) + .field("sequence_number", &self.sequence_number) + .field("topic", &self.topic) + .finish() + } +} + +/// A subscription received by the gossipsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Subscription { + /// Action to perform. + pub action: SubscriptionAction, + /// The topic from which to subscribe or unsubscribe. + pub topic_hash: TopicHash, + /// Peer only wants to receive partial messages instead of full messages. + #[cfg(feature = "partial_messages")] + pub partial: bool, +} + +/// Action that a subscription wants to perform. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum SubscriptionAction { + /// The remote wants to subscribe to the given topic. + Subscribe, + /// The remote wants to unsubscribe from the given topic. + Unsubscribe, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct PeerInfo { + pub(crate) peer_id: Option, + // TODO add this when RFC: Signed Address Records got added to the spec (see pull request + // https://github.com/libp2p/specs/pull/217) + // pub signed_peer_record: ?, +} + +/// A Control message received by the gossipsub system. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum ControlAction { + /// Node broadcasts known messages per topic - IHave control message. + IHave(IHave), + /// The node requests specific message ids (peer_id + sequence _number) - IWant control + /// message. + IWant(IWant), + /// The node has been added to the mesh - Graft control message. + Graft(Graft), + /// The node has been removed from the mesh - Prune control message. + Prune(Prune), + /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant + /// control message. + IDontWant(IDontWant), + /// The Node has sent us its supported extensions. + Extensions(Option), +} + +/// Node broadcasts known messages per topic - IHave control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IHave { + /// The topic of the messages. + pub(crate) topic_hash: TopicHash, + /// A list of known message ids (peer_id + sequence _number) as a string. + pub(crate) message_ids: Vec, +} + +/// The node requests specific message ids (peer_id + sequence _number) - IWant control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IWant { + /// A list of known message ids (peer_id + sequence _number) as a string. + pub(crate) message_ids: Vec, +} + +/// The node has been added to the mesh - Graft control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Graft { + /// The mesh topic the peer should be added to. + pub(crate) topic_hash: TopicHash, +} + +/// The node has been removed from the mesh - Prune control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Prune { + /// The mesh topic the peer should be removed from. + pub(crate) topic_hash: TopicHash, + /// A list of peers to be proposed to the removed peer as peer exchange + pub(crate) peers: Vec, + /// The backoff time in seconds before we allow to reconnect + pub(crate) backoff: Option, +} + +/// The node requests us to not forward message ids - IDontWant control message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct IDontWant { + /// A list of known message ids. + pub(crate) message_ids: Vec, +} + +/// A received partial message. +#[cfg(feature = "partial_messages")] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PartialMessage { + /// The topic ID this partial message belongs to. + pub topic_id: TopicHash, + /// The group ID that identifies the complete logical message. + pub group_id: Vec, + /// The partial metadata we have and we want. + pub metadata: Option>, + /// The partial message itself. + pub message: Option>, +} + +/// The node has sent us the supported Gossipsub Extensions. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct Extensions { + pub(crate) test_extension: Option, + pub(crate) partial_messages: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TestExtension {} + +/// A Gossipsub RPC message sent. +#[derive(Debug)] +pub enum RpcOut { + /// Publish a Gossipsub message on network.`timeout` limits the duration the message + /// can wait to be sent before it is abandoned. + Publish { + message_id: MessageId, + message: RawMessage, + timeout: Delay, + }, + /// Forward a Gossipsub message on network. `timeout` limits the duration the message + /// can wait to be sent before it is abandoned. + Forward { + message_id: MessageId, + message: RawMessage, + timeout: Delay, + }, + /// Subscribe a topic. + Subscribe { + topic: TopicHash, + #[cfg(feature = "partial_messages")] + requests_partials: bool, + }, + /// Unsubscribe a topic. + Unsubscribe(TopicHash), + /// Send a GRAFT control message. + Graft(Graft), + /// Send a PRUNE control message. + Prune(Prune), + /// Send a IHave control message. + IHave(IHave), + /// Send a IWant control message. + IWant(IWant), + /// The node requests us to not forward message ids (peer_id + sequence _number) - IDontWant + /// control message. + IDontWant(IDontWant), + /// Send a Extensions control message. + Extensions(Extensions), + /// Send a test extension message. + TestExtension, + /// Send a partial messages extension. + PartialMessage { + /// The group ID that identifies the complete logical message. + group_id: Vec, + /// The topic ID this partial message belongs to. + topic_id: TopicHash, + /// The partial message itself. + message: Option>, + /// The partial metadata we have and want. + metadata: Vec, + }, +} + +impl RpcOut { + /// Converts the GossipsubRPC into its protobuf format. + // A convenience function to avoid explicitly specifying types. + pub fn into_protobuf(self) -> proto::RPC { + self.into() + } + + /// Returns true if the `RpcOut` is priority. + pub(crate) fn priority(&self) -> bool { + matches!( + self, + RpcOut::Subscribe { .. } + | RpcOut::Unsubscribe(_) + | RpcOut::Graft(_) + | RpcOut::Prune(_) + | RpcOut::IDontWant(_) + ) + } +} + +impl From for proto::RPC { + /// Converts the RPC into protobuf format. + fn from(rpc: RpcOut) -> Self { + match rpc { + RpcOut::Publish { message, .. } => proto::RPC { + subscriptions: Vec::new(), + publish: vec![message.into()], + control: None, + testExtension: None, + partial: None, + }, + RpcOut::Forward { message, .. } => proto::RPC { + publish: vec![message.into()], + subscriptions: Vec::new(), + control: None, + testExtension: None, + partial: None, + }, + RpcOut::Subscribe { + topic, + #[cfg(feature = "partial_messages")] + requests_partials: partial_only, + } => proto::RPC { + publish: Vec::new(), + subscriptions: vec![proto::SubOpts { + subscribe: Some(true), + topic_id: Some(topic.into_string()), + #[cfg(not(feature = "partial_messages"))] + partial: None, + #[cfg(feature = "partial_messages")] + requestsPartial: Some(partial_only), + }], + control: None, + testExtension: None, + partial: None, + }, + RpcOut::Unsubscribe(topic) => proto::RPC { + publish: Vec::new(), + subscriptions: vec![proto::SubOpts { + subscribe: Some(false), + topic_id: Some(topic.into_string()), + requestsPartial: None, + }], + control: None, + testExtension: None, + partial: None, + }, + RpcOut::IHave(IHave { + topic_hash, + message_ids, + }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![proto::ControlIHave { + topic_id: Some(topic_hash.into_string()), + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![], + extensions: None, + }), + testExtension: None, + partial: None, + }, + RpcOut::IWant(IWant { message_ids }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![proto::ControlIWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + graft: vec![], + prune: vec![], + idontwant: vec![], + extensions: None, + }), + testExtension: None, + partial: None, + }, + RpcOut::Graft(Graft { topic_hash }) => proto::RPC { + publish: Vec::new(), + subscriptions: vec![], + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![proto::ControlGraft { + topic_id: Some(topic_hash.into_string()), + }], + prune: vec![], + idontwant: vec![], + extensions: None, + }), + testExtension: None, + partial: None, + }, + RpcOut::Prune(Prune { + topic_hash, + peers, + backoff, + }) => { + proto::RPC { + publish: Vec::new(), + subscriptions: vec![], + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![proto::ControlPrune { + topic_id: Some(topic_hash.into_string()), + peers: peers + .into_iter() + .map(|info| proto::PeerInfo { + peer_id: info.peer_id.map(|id| id.to_bytes()), + // TODO, see https://github.com/libp2p/specs/pull/217 + signed_peer_record: None, + }) + .collect(), + backoff, + }], + idontwant: vec![], + extensions: None, + }), + testExtension: None, + partial: None, + } + } + RpcOut::IDontWant(IDontWant { message_ids }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![proto::ControlIDontWant { + message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), + }], + extensions: None, + }), + testExtension: None, + partial: None, + }, + RpcOut::Extensions(Extensions { + partial_messages, + test_extension, + }) => proto::RPC { + publish: Vec::new(), + subscriptions: Vec::new(), + control: Some(proto::ControlMessage { + ihave: vec![], + iwant: vec![], + graft: vec![], + prune: vec![], + idontwant: vec![], + extensions: Some(proto::ControlExtensions { + testExtension: test_extension, + partialMessages: partial_messages, + }), + }), + testExtension: None, + partial: None, + }, + RpcOut::TestExtension => proto::RPC { + subscriptions: vec![], + publish: vec![], + control: None, + testExtension: Some(proto::TestExtension {}), + partial: None, + }, + RpcOut::PartialMessage { + topic_id, + group_id, + metadata, + message, + } => proto::RPC { + subscriptions: vec![], + publish: vec![], + control: None, + testExtension: None, + partial: Some(proto::PartialMessagesExtension { + topicID: Some(topic_id.as_str().as_bytes().to_vec()), + groupID: Some(group_id), + partialMessage: message, + partsMetadata: Some(metadata), + }), + }, + } + } +} + +/// A Gossipsub RPC message received. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct RpcIn { + /// List of messages that were part of this RPC query. + pub messages: Vec, + /// List of subscriptions. + pub subscriptions: Vec, + /// List of Gossipsub control messages. + pub control_msgs: Vec, + /// Gossipsub test extension. + pub test_extension: Option, + /// Partial messages extension. + #[cfg(feature = "partial_messages")] + pub partial_message: Option, +} + +impl fmt::Debug for RpcIn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut b = f.debug_struct("GossipsubRpc"); + if !self.messages.is_empty() { + b.field("messages", &self.messages); + } + if !self.subscriptions.is_empty() { + b.field("subscriptions", &self.subscriptions); + } + if !self.control_msgs.is_empty() { + b.field("control_msgs", &self.control_msgs); + } + #[cfg(feature = "partial_messages")] + b.field("partial_messages", &self.partial_message); + + b.finish() + } +} + +impl PeerKind { + pub fn as_static_ref(&self) -> &'static str { + match self { + Self::NotSupported => "Not Supported", + Self::Floodsub => "Floodsub", + Self::Gossipsub => "Gossipsub v1.0", + Self::Gossipsubv1_1 => "Gossipsub v1.1", + Self::Gossipsubv1_2 => "Gossipsub v1.2", + Self::Gossipsubv1_3 => "Gossipsub v1.3", + } + } +} + +impl AsRef for PeerKind { + fn as_ref(&self) -> &str { + self.as_static_ref() + } +} + +impl fmt::Display for PeerKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_ref()) + } +}