Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@
//! light-client-related requests for information about the state. Each request is the encoding of
//! a `light::Request` and each response is the encoding of a `light::Response`, as defined in the
//! `light.v1.proto` file in this source tree.
//! - **`/<protocol-id>/transactions/1`** is a notifications protocol (see below) where
//! transactions are pushed to other nodes. The handshake is empty on both sides. The message
//! format is a SCALE-encoded list of transactions, where each transaction is an opaque list of
//! bytes.
//! - **`/<protocol-id>/block-announces/1`** is a notifications protocol (see below) where
//! block announces are pushed to other nodes. The handshake is empty on both sides. The message
//! format is a SCALE-encoded tuple containing a block header followed with an opaque list of
//! bytes containing some data associated with this block announcement, e.g. a candidate message.
//! - Notifications protocols that are registered using the `register_notifications_protocol`
//! method. For example: `/paritytech/grandpa/1`. See below for more information.
//!
Expand Down
151 changes: 123 additions & 28 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use sp_runtime::traits::{
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::generic::Message as GenericMessage;
use message::generic::{Message as GenericMessage, ConsensusMessage};
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
Expand Down Expand Up @@ -221,8 +221,12 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
behaviour: GenericProto,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
/// For each protocol name, the legacy gossiping engine ID.
protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
/// For each protocol name, the legacy equivalent.
legacy_equiv_by_name: HashMap<Cow<'static, [u8]>, Fallback>,
/// Name of the protocol used for transactions.
transactions_protocol: Cow<'static, [u8]>,
/// Name of the protocol used for block announces.
block_announces_protocol: Cow<'static, [u8]>,
/// Prometheus metrics.
metrics: Option<Metrics>,
/// The `PeerId`'s of all boot nodes.
Expand Down Expand Up @@ -424,6 +428,17 @@ impl Default for ProtocolConfig {
}
}

/// Fallback mechanism to use to send a notification if no substream is open.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Fallback {
/// Use a `Message::Consensus` with the given engine ID.
Consensus(ConsensusEngineId),
/// The message is the bytes encoding of a `Transactions<E>` (which is itself defined as a `Vec<E>`).
Transactions,
/// The message is the bytes encoding of a `BlockAnnounce<H>`.
BlockAnnounce,
}

impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// Create a new instance.
pub fn new(
Expand Down Expand Up @@ -460,7 +475,27 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {

let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let behaviour = GenericProto::new(protocol_id, versions, peerset);
let mut behaviour = GenericProto::new(protocol_id.clone(), versions, peerset);

let mut legacy_equiv_by_name = HashMap::new();

let transactions_protocol: Cow<'static, [u8]> = Cow::from({
let mut proto = b"/".to_vec();
proto.extend(protocol_id.as_bytes());
proto.extend(b"/transactions/1");
proto
});
behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);

let block_announces_protocol: Cow<'static, [u8]> = Cow::from({
let mut proto = b"/".to_vec();
proto.extend(protocol_id.as_bytes());
proto.extend(b"/block-announces/1");
proto
});
behaviour.register_notif_protocol(block_announces_protocol.clone(), Vec::new());
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);

let protocol = Protocol {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
Expand All @@ -481,7 +516,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
peerset_handle: peerset_handle.clone(),
behaviour,
protocol_name_by_engine: HashMap::new(),
protocol_engine_by_name: HashMap::new(),
legacy_equiv_by_name,
transactions_protocol,
block_announces_protocol,
metrics: if let Some(r) = metrics_registry {
Some(Metrics::register(r)?)
} else {
Expand Down Expand Up @@ -731,12 +768,18 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
);
}

fn send_message(&mut self, who: &PeerId, message: Message<B>) {
fn send_message(
&mut self,
who: &PeerId,
message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
legacy: Message<B>,
) {
send_message::<B>(
&mut self.behaviour,
&mut self.context_data.stats,
who,
message,
legacy,
);
}

Expand Down Expand Up @@ -793,11 +836,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

fn on_block_request(
&mut self,
peer: PeerId,
request: message::BlockRequest<B>
) {
fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
request.id,
peer,
Expand Down Expand Up @@ -874,7 +913,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
blocks: blocks,
};
trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
self.send_message(&peer, GenericMessage::BlockResponse(response))
self.send_message(&peer, None, GenericMessage::BlockResponse(response))
}

/// Adjusts the reputation of a node.
Expand Down Expand Up @@ -1132,10 +1171,15 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
&mut self,
target: PeerId,
engine_id: ConsensusEngineId,
message: impl Into<Vec<u8>>
message: impl Into<Vec<u8>>,
) {
if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) {
self.behaviour.write_notification(&target, engine_id, protocol_name.clone(), message);
let message = message.into();
let fallback = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage {
engine_id,
data: message.clone(),
}).encode();
self.behaviour.write_notification(&target, protocol_name.clone(), message, fallback);
} else {
error!(
target: "sub-libp2p",
Expand All @@ -1158,8 +1202,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
} else {
self.behaviour.register_notif_protocol(protocol_name.clone(), engine_id, Vec::new());
self.protocol_engine_by_name.insert(protocol_name, engine_id);
self.behaviour.register_notif_protocol(protocol_name.clone(), Vec::new());
self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id));
}

// Registering a protocol while we already have open connections isn't great, but for now
Expand Down Expand Up @@ -1229,7 +1273,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn do_propagate_extrinsics(
&mut self,
extrinsics: &[(H, B::Extrinsic)],
) -> HashMap<H, Vec<String>> {
) -> HashMap<H, Vec<String>> {
let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() {
// never send extrinsics to the light node
Expand All @@ -1251,10 +1295,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
let encoded = to_send.encode();
send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
Some((self.transactions_protocol.clone(), encoded)),
GenericMessage::Transactions(to_send)
)
}
Expand Down Expand Up @@ -1309,7 +1355,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
let inserted = peer.known_blocks.insert(hash);
if inserted || force {
let message: Message<B> = GenericMessage::BlockAnnounce(message::BlockAnnounce {
let message = message::BlockAnnounce {
header: header.clone(),
state: if peer.info.protocol_version >= 4 {
if is_best {
Expand All @@ -1325,13 +1371,16 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
} else {
None
},
});
};

let encoded = message.encode();

send_message::<B> (
&mut self.behaviour,
&mut self.context_data.stats,
&who,
message,
Some((self.block_announces_protocol.clone(), encoded)),
Message::<B>::BlockAnnounce(message),
)
}
}
Expand All @@ -1350,10 +1399,14 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
};

self.send_message(&who, GenericMessage::Status(status))
self.send_message(&who, None, GenericMessage::Status(status))
}

fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce<B::Header>) -> CustomMessageOutcome<B> {
fn on_block_announce(
&mut self,
who: PeerId,
announce: BlockAnnounce<B::Header>,
) -> CustomMessageOutcome<B> {
let hash = announce.header.hash();
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
peer.known_blocks.insert(hash.clone());
Expand Down Expand Up @@ -1468,6 +1521,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {

self.send_message(
&who,
None,
GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
id: request.id,
proof,
Expand Down Expand Up @@ -1598,6 +1652,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
self.send_message(
&who,
None,
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id,
proof,
Expand Down Expand Up @@ -1662,6 +1717,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
self.send_message(
&who,
None,
GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id,
proof,
Expand Down Expand Up @@ -1702,6 +1758,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
self.send_message(
&who,
None,
GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
id: request.id,
header,
Expand Down Expand Up @@ -1772,6 +1829,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
self.send_message(
&who,
None,
GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
id: request.id,
max: proof.max_block,
Expand Down Expand Up @@ -1822,6 +1880,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};
self.send_message(
&who,
None,
GenericMessage::FinalityProofResponse(message::FinalityProofResponse {
id: 0,
block: request.block,
Expand Down Expand Up @@ -1951,20 +2010,25 @@ fn send_request<B: BlockT, H: ExHashT>(
peer.block_request = Some((Instant::now(), r.clone()));
}
}
send_message::<B>(behaviour, stats, who, message)
send_message::<B>(behaviour, stats, who, None, message)
}

fn send_message<B: BlockT>(
behaviour: &mut GenericProto,
stats: &mut HashMap<&'static str, PacketStats>,
who: &PeerId,
message: Message<B>,
message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
legacy_message: Message<B>,
) {
let encoded = message.encode();
let mut stats = stats.entry(message.id()).or_default();
let encoded = legacy_message.encode();
let mut stats = stats.entry(legacy_message.id()).or_default();
stats.bytes_out += encoded.len() as u64;
stats.count_out += 1;
behaviour.send_packet(who, encoded);
if let Some((proto, msg)) = message {
behaviour.write_notification(who, proto, msg, encoded);
} else {
behaviour.send_packet(who, encoded);
}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
Expand Down Expand Up @@ -2061,8 +2125,39 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
self.on_peer_disconnected(peer_id.clone())
},
GenericProtoOut::CustomMessage { peer_id, message } =>
GenericProtoOut::LegacyMessage { peer_id, message } =>
self.on_custom_message(peer_id, message),
GenericProtoOut::Notification { peer_id, protocol_name, message } =>
match self.legacy_equiv_by_name.get(&protocol_name) {
Some(Fallback::Consensus(engine_id)) => {
CustomMessageOutcome::NotificationsReceived {
remote: peer_id,
messages: vec![(*engine_id, message.freeze())],
}
}
Some(Fallback::Transactions) => {
if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
self.on_extrinsics(peer_id, m);
} else {
warn!(target: "sub-libp2p", "Failed to decode transactions list");
}
CustomMessageOutcome::None
}
Some(Fallback::BlockAnnounce) => {
if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) {
let outcome = self.on_block_announce(peer_id.clone(), announce);
self.update_peer_info(&peer_id);
outcome
} else {
warn!(target: "sub-libp2p", "Failed to decode block announce");
CustomMessageOutcome::None
}
}
None => {
error!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name);
CustomMessageOutcome::None
}
}
GenericProtoOut::Clogged { peer_id, messages } => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
Expand Down
Loading