diff --git a/client/beefy/rpc/src/lib.rs b/client/beefy/rpc/src/lib.rs index 4c1bc03e222e7..596e784ebf528 100644 --- a/client/beefy/rpc/src/lib.rs +++ b/client/beefy/rpc/src/lib.rs @@ -23,9 +23,9 @@ use parking_lot::RwLock; use std::sync::Arc; -use sp_runtime::traits::Block as BlockT; +use sp_runtime::traits::{Block as BlockT, NumberFor}; -use futures::{task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt}; +use futures::{future, task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId}; use log::warn; @@ -118,7 +118,7 @@ pub trait BeefyApi { /// Implements the BeefyApi RPC trait for interacting with BEEFY. pub struct BeefyRpcHandler { signed_commitment_stream: BeefySignedCommitmentStream, - beefy_best_block: Arc>>, + beefy_best_block: Arc)>>>, manager: SubscriptionManager, } @@ -139,7 +139,7 @@ impl BeefyRpcHandler { let future = stream.for_each(move |best_beefy| { let async_clone = closure_clone.clone(); async move { - *async_clone.write() = Some(best_beefy); + *async_clone.write() = Some((best_beefy.0, best_beefy.1)); } }); @@ -166,10 +166,23 @@ where _metadata: Self::Metadata, subscriber: Subscriber, ) { - let stream = self - .signed_commitment_stream - .subscribe() - .map(|x| Ok::<_, ()>(Ok(notification::EncodedSignedCommitment::new::(x)))); + let beefy_block = self.beefy_best_block.clone(); + let stream = + self.signed_commitment_stream + .subscribe() + .filter(move |x| { + let best_block_clone = beefy_block.clone(); + let best_block = best_block_clone.read(); + if let Some((.., best_block_num)) = *best_block { + if x.commitment.block_number <= best_block_num { + log::error!("Beefy rpc justification stream received a duplicate signed commitment"); + } + future::ready(x.commitment.block_number > best_block_num) + } else { + future::ready(true) + } + }) + .map(|x| Ok::<_, ()>(Ok(notification::EncodedSignedCommitment::new::(x)))); self.manager.add(subscriber, |sink| { stream @@ -192,6 +205,7 @@ where .read() .as_ref() .cloned() + .map(|x| x.0) .ok_or(Error::EndpointNotReady.into()); let future = async move { result }.boxed(); future.map_err(jsonrpc_core::Error::from).boxed() @@ -257,7 +271,7 @@ mod tests { let (io, _) = setup_io_handler_with_best_block_stream(stream); let hash = BlakeTwo256::hash(b"42"); - let r: Result<(), ()> = sender.notify(|| Ok(hash)); + let r: Result<(), ()> = sender.notify(|| Ok((hash, 0))); r.unwrap(); // Verify RPC `beefy_getFinalizedHead` returns expected hash. @@ -319,7 +333,10 @@ mod tests { // Unsubscribe again and fail assert_eq!( io.handle_request_sync(&unsub_req, meta), - Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()), + Some( + r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"# + .into() + ), ); } @@ -341,18 +358,17 @@ mod tests { r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#, meta.clone() ), - Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()) + Some( + r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"# + .into() + ) ); } - fn create_commitment() -> BeefySignedCommitment { + fn create_commitment(block_number: u64) -> BeefySignedCommitment { let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, "Hello World!".encode()); BeefySignedCommitment:: { - commitment: beefy_primitives::Commitment { - payload, - block_number: 5, - validator_set_id: 0, - }, + commitment: beefy_primitives::Commitment { payload, block_number, validator_set_id: 0 }, signatures: vec![], } } @@ -371,7 +387,7 @@ mod tests { let sub_id: String = serde_json::from_value(resp["result"].take()).unwrap(); // Notify with commitment - let commitment = create_commitment(); + let commitment = create_commitment(5); let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone())); r.unwrap(); @@ -393,4 +409,49 @@ mod tests { assert_eq!(recv_sub_id, sub_id); assert_eq!(recv_commitment, commitment); } + + #[test] + fn subscribe_dedupe_justifications() { + let (sender, stream) = BeefyBestBlockStream::::channel(); + let (io, commitment_sender) = setup_io_handler_with_best_block_stream(stream); + let (meta, receiver) = setup_session(); + + // Subscribe + let sub_request = + r#"{"jsonrpc":"2.0","method":"beefy_subscribeJustifications","params":[],"id":1}"#; + + let resp = io.handle_request_sync(sub_request, meta.clone()); + let mut resp: serde_json::Value = serde_json::from_str(&resp.unwrap()).unwrap(); + let sub_id: String = serde_json::from_value(resp["result"].take()).unwrap(); + + // Notify with duplicate commitments + let commitment = create_commitment(5); + let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone())); + r.unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(100)); + + let hash = BlakeTwo256::hash(b"42"); + let r: Result<(), ()> = sender.notify(|| Ok((hash, 5))); + r.unwrap(); + + let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment)); + r.unwrap(); + + let commitment = create_commitment(6); + + let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment)); + r.unwrap(); + + // Inspect what we received + // We should have received only two commitments + let recvs = futures::executor::block_on(receiver.take(2).collect::>()); + assert_eq!( + recvs, + vec![ + format!("{{\"jsonrpc\":\"2.0\",\"method\":\"beefy_justifications\",\"params\":{{\"result\":\"0x046d68343048656c6c6f20576f726c64210500000000000000000000000000000004000000000000\",\"subscription\":\"{}\"}}}}", sub_id), + format!("{{\"jsonrpc\":\"2.0\",\"method\":\"beefy_justifications\",\"params\":{{\"result\":\"0x046d68343048656c6c6f20576f726c64210600000000000000000000000000000004000000000000\",\"subscription\":\"{}\"}}}}", sub_id) + ] + ); + } } diff --git a/client/beefy/src/notification.rs b/client/beefy/src/notification.rs index 7c18d809f6efb..278bef7ef714f 100644 --- a/client/beefy/src/notification.rs +++ b/client/beefy/src/notification.rs @@ -25,12 +25,13 @@ pub type BeefySignedCommitment = /// The sending half of the notifications channel(s) used to send /// notifications about best BEEFY block from the gadget side. -pub type BeefyBestBlockSender = NotificationSender<::Hash>; +pub type BeefyBestBlockSender = + NotificationSender<(::Hash, NumberFor)>; /// The receiving half of a notifications channel used to receive /// notifications about best BEEFY blocks determined on the gadget side. pub type BeefyBestBlockStream = - NotificationStream<::Hash, BeefyBestBlockTracingKey>; + NotificationStream<(::Hash, NumberFor), BeefyBestBlockTracingKey>; /// The sending half of the notifications channel(s) used to send notifications /// about signed commitments generated at the end of a BEEFY round. diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index 0c7d8d4ffdc9c..843c220d53035 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -249,7 +249,9 @@ where self.best_beefy_block = Some(*notification.header.number()); self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(notification.hash.clone())) + .notify(|| { + Ok::<_, ()>((notification.hash.clone(), *notification.header.number())) + }) .expect("forwards closure result; the closure always returns Ok; qed."); // this metric is kind of 'fake'. Best BEEFY block should only be updated once we @@ -374,7 +376,7 @@ where if let Err(err) = self.client.hash(block_num).map(|h| { if let Some(hash) = h { self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(hash)) + .notify(|| Ok::<_, ()>((hash, block_num))) .expect("forwards closure result; the closure always returns Ok; qed."); } }) {