diff --git a/finality-aleph/src/sync/mod.rs b/finality-aleph/src/sync/mod.rs index 9907ec1ca9..880c2e84ef 100644 --- a/finality-aleph/src/sync/mod.rs +++ b/finality-aleph/src/sync/mod.rs @@ -59,23 +59,25 @@ pub trait Finalizer { fn finalize(&self, justification: J) -> Result<(), Self::Error>; } -/// A notification about the chain state changing. -pub enum ChainStateNotification { +/// A notification about the chain status changing. +pub enum ChainStatusNotification { /// A block has been imported. BlockImported(BI), /// A block has been finalized. BlockFinalized(BI), } -/// A stream of notifications about the chain state in the database changing. +/// A stream of notifications about the chain status in the database changing. #[async_trait::async_trait] -pub trait ChainStateNotifier { - /// Returns a chain state notification when it is available. - async fn next(&self) -> ChainStateNotification; +pub trait ChainStatusNotifier { + type Error: Display; + + /// Returns a chain status notification when it is available. + async fn next(&mut self) -> Result, Self::Error>; } -/// The state of a block in the database. -pub enum BlockState { +/// The status of a block in the database. +pub enum BlockStatus { /// The block is justified and thus finalized. Justified(J), /// The block is present, might be finalized if a descendant is justified. @@ -84,10 +86,10 @@ pub enum BlockState { Unknown, } -/// The knowledge about the chain state. -pub trait ChainState { - /// The state of the block. - fn state_of(&self, id: ::Identifier) -> BlockState; +/// The knowledge about the chain status. +pub trait ChainStatus { + /// The status of the block. + fn status_of(&self, id: ::Identifier) -> BlockStatus; /// The header of the best block. fn best_block(&self) -> J::Header; diff --git a/finality-aleph/src/sync/substrate.rs b/finality-aleph/src/sync/substrate/mod.rs similarity index 98% rename from finality-aleph/src/sync/substrate.rs rename to finality-aleph/src/sync/substrate/mod.rs index 5516f80ba2..e7e46924a4 100644 --- a/finality-aleph/src/sync/substrate.rs +++ b/finality-aleph/src/sync/substrate/mod.rs @@ -5,6 +5,8 @@ use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One}; use crate::sync::{BlockIdentifier, Header}; +mod status_notifier; + #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockId> { hash: H::Hash, diff --git a/finality-aleph/src/sync/substrate/status_notifier.rs b/finality-aleph/src/sync/substrate/status_notifier.rs new file mode 100644 index 0000000000..042e05104a --- /dev/null +++ b/finality-aleph/src/sync/substrate/status_notifier.rs @@ -0,0 +1,78 @@ +use std::fmt::{Display, Error as FmtError, Formatter}; + +use aleph_primitives::BlockNumber; +use futures::StreamExt; +use sc_client_api::client::{FinalityNotifications, ImportNotifications}; +use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader}; +use tokio::select; + +use crate::sync::{substrate::BlockId, ChainStatusNotification, ChainStatusNotifier, Header}; + +/// What can go wrong when waiting for next chain status notification. +#[derive(Debug)] +pub enum Error { + JustificationStreamClosed, + ImportStreamClosed, +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use Error::*; + match self { + JustificationStreamClosed => { + write!(f, "finalization notification stream has ended") + } + ImportStreamClosed => { + write!(f, "import notification stream has ended") + } + } + } +} + +/// Substrate specific implementation of `ChainStatusNotifier`. +pub struct SubstrateChainStatusNotifier +where + B: BlockT, +{ + finality_notifications: FinalityNotifications, + import_notifications: ImportNotifications, +} + +impl SubstrateChainStatusNotifier +where + B: BlockT, +{ + fn new( + finality_notifications: FinalityNotifications, + import_notifications: ImportNotifications, + ) -> Self { + Self { + finality_notifications, + import_notifications, + } + } +} + +#[async_trait::async_trait] +impl ChainStatusNotifier> for SubstrateChainStatusNotifier +where + B: BlockT, + B::Header: SubstrateHeader, +{ + type Error = Error; + + async fn next(&mut self) -> Result>, Self::Error> { + select! { + maybe_block = self.finality_notifications.next() => { + maybe_block + .map(|block| ChainStatusNotification::BlockFinalized(block.header.id())) + .ok_or(Error::JustificationStreamClosed) + }, + maybe_block = self.import_notifications.next() => { + maybe_block + .map(|block| ChainStatusNotification::BlockImported(block.header.id())) + .ok_or(Error::ImportStreamClosed) + } + } + } +}