Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 26 additions & 26 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
gossip_engine: GossipEngine<B>,
gossip_engine: Arc<Mutex<GossipEngine<B>>>,
validator: Arc<GossipValidator<B>>,

/// Sender side of the neighbor packet channel.
Expand Down Expand Up @@ -185,12 +185,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
);

let validator = Arc::new(validator);
let gossip_engine = GossipEngine::new(
let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(),
GRANDPA_ENGINE_ID,
GRANDPA_PROTOCOL_NAME,
validator.clone()
);
)));

{
// register all previous votes with the gossip service so that they're
Expand All @@ -214,7 +214,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}
);

gossip_engine.register_gossip_message(
gossip_engine.lock().register_gossip_message(
topic,
message.encode(),
);
Expand Down Expand Up @@ -293,7 +293,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
});

let topic = round_topic::<B>(round.0, set_id.0);
let incoming = self.gossip_engine.messages_for(topic)
let incoming = self.gossip_engine.lock().messages_for(topic)
.filter_map(move |notification| {
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);

Expand Down Expand Up @@ -422,11 +422,11 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
type Output = Result<(), Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
Poll::Ready(Some((to, packet))) => {
self.gossip_engine.send_message(to, packet.encode());
self.gossip_engine.lock().send_message(to, packet.encode());
},
Poll::Ready(None) => return Poll::Ready(
Err(Error::Network("Neighbor packet worker stream closed.".into()))
Expand All @@ -438,7 +438,7 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
loop {
match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
Poll::Ready(Some(PeerReport { who, cost_benefit })) => {
self.gossip_engine.report(who, cost_benefit);
self.gossip_engine.lock().report(who, cost_benefit);
},
Poll::Ready(None) => return Poll::Ready(
Err(Error::Network("Gossip validator report stream closed.".into()))
Expand All @@ -447,7 +447,7 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
}
}

match self.gossip_engine.poll_unpin(cx) {
match self.gossip_engine.lock().poll_unpin(cx) {
// The gossip engine future finished. We should do the same.
Poll::Ready(()) => return Poll::Ready(Ok(())),
Poll::Pending => {},
Expand All @@ -458,7 +458,7 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
}

fn incoming_global<B: BlockT>(
mut gossip_engine: GossipEngine<B>,
gossip_engine: Arc<Mutex<GossipEngine<B>>>,
topic: B::Hash,
voters: Arc<VoterSet<AuthorityId>>,
gossip_validator: Arc<GossipValidator<B>>,
Expand All @@ -467,7 +467,7 @@ fn incoming_global<B: BlockT>(
let process_commit = move |
msg: FullCommitMessage<B>,
mut notification: sc_network_gossip::TopicNotification,
gossip_engine: &mut GossipEngine<B>,
gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>,
| {
Expand All @@ -491,7 +491,7 @@ fn incoming_global<B: BlockT>(
msg.set_id,
) {
if let Some(who) = notification.sender {
gossip_engine.report(who, cost);
gossip_engine.lock().report(who, cost);
}

return None;
Expand All @@ -513,12 +513,12 @@ fn incoming_global<B: BlockT>(
|to, neighbor| neighbor_sender.send(to, neighbor),
);

gossip_engine.gossip_message(topic, notification.message.clone(), false);
gossip_engine.lock().gossip_message(topic, notification.message.clone(), false);
}
voter::CommitProcessingOutcome::Bad(_) => {
// report peer and do not gossip.
if let Some(who) = notification.sender.take() {
gossip_engine.report(who, cost::INVALID_COMMIT);
gossip_engine.lock().report(who, cost::INVALID_COMMIT);
}
}
};
Expand All @@ -531,7 +531,7 @@ fn incoming_global<B: BlockT>(
let process_catch_up = move |
msg: FullCatchUpMessage<B>,
mut notification: sc_network_gossip::TopicNotification,
gossip_engine: &mut GossipEngine<B>,
gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>,
| {
Expand All @@ -544,7 +544,7 @@ fn incoming_global<B: BlockT>(
msg.set_id,
) {
if let Some(who) = notification.sender {
gossip_engine.report(who, cost);
gossip_engine.lock().report(who, cost);
}

return None;
Expand All @@ -554,7 +554,7 @@ fn incoming_global<B: BlockT>(
if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
// report peer
if let Some(who) = notification.sender.take() {
gossip_engine.report(who, cost::INVALID_CATCH_UP);
gossip_engine.lock().report(who, cost::INVALID_CATCH_UP);
}
}

Expand All @@ -566,7 +566,7 @@ fn incoming_global<B: BlockT>(
Some(voter::CommunicationIn::CatchUp(msg.message, cb))
};

gossip_engine.messages_for(topic)
gossip_engine.clone().lock().messages_for(topic)
.filter_map(|notification| {
// this could be optimized by decoding piecewise.
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
Expand All @@ -578,9 +578,9 @@ fn incoming_global<B: BlockT>(
.filter_map(move |(notification, msg)| {
future::ready(match msg {
GossipMessage::Commit(msg) =>
process_commit(msg, notification, &mut gossip_engine, &gossip_validator, &*voters),
process_commit(msg, notification, &gossip_engine, &gossip_validator, &*voters),
GossipMessage::CatchUp(msg) =>
process_catch_up(msg, notification, &mut gossip_engine, &gossip_validator, &*voters),
process_catch_up(msg, notification, &gossip_engine, &gossip_validator, &*voters),
_ => {
debug!(target: "afg", "Skipping unknown message type");
None
Expand Down Expand Up @@ -688,7 +688,7 @@ pub(crate) struct OutgoingMessages<Block: BlockT> {
set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::Sender<SignedMessage<Block>>,
network: GossipEngine<Block>,
network: Arc<Mutex<GossipEngine<Block>>>,
has_voted: HasVoted<Block>,
}

Expand Down Expand Up @@ -754,11 +754,11 @@ impl<Block: BlockT> Sink<Message<Block>> for OutgoingMessages<Block>
);

// announce the block we voted on to our peers.
self.network.announce(target_hash, Vec::new());
self.network.lock().announce(target_hash, Vec::new());

// propagate the message to peers
let topic = round_topic::<Block>(self.round, self.set_id);
self.network.gossip_message(topic, message.encode(), false);
self.network.lock().gossip_message(topic, message.encode(), false);

// forward the message to the inner sender.
return self.sender.start_send(signed).map_err(|e| {
Expand Down Expand Up @@ -959,7 +959,7 @@ fn check_catch_up<Block: BlockT>(

/// An output sink for commit messages.
struct CommitsOut<Block: BlockT> {
network: GossipEngine<Block>,
network: Arc<Mutex<GossipEngine<Block>>>,
set_id: SetId,
is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>,
Expand All @@ -969,7 +969,7 @@ struct CommitsOut<Block: BlockT> {
impl<Block: BlockT> CommitsOut<Block> {
/// Create a new commit output stream.
pub(crate) fn new(
network: GossipEngine<Block>,
network: Arc<Mutex<GossipEngine<Block>>>,
set_id: SetIdNumber,
is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>,
Expand Down Expand Up @@ -1028,7 +1028,7 @@ impl<Block: BlockT> Sink<(RoundNumber, Commit<Block>)> for CommitsOut<Block> {
commit.target_number,
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);
self.network.gossip_message(topic, message.encode(), false);
self.network.lock().gossip_message(topic, message.encode(), false);

Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion client/network-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ futures-timer = "3.0.1"
libp2p = { version = "0.16.1", default-features = false, features = ["libp2p-websocket"] }
log = "0.4.8"
lru = "0.1.2"
parking_lot = "0.10.0"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line probably summarizes the pull request the best.

sc-network = { version = "0.8.0-dev", path = "../network" }
sp-runtime = { version = "2.0.0-dev", path = "../../primitives/runtime" }
wasm-timer = "0.2"
Loading