diff --git a/finality-aleph/src/sync/mod.rs b/finality-aleph/src/sync/mod.rs index 880c2e84ef..801e0b7902 100644 --- a/finality-aleph/src/sync/mod.rs +++ b/finality-aleph/src/sync/mod.rs @@ -4,8 +4,11 @@ use std::{ }; mod substrate; +mod task_queue; mod ticker; +const LOG_TARGET: &str = "aleph-block-sync"; + /// The identifier of a block, the least amount of knowledge we can have about a block. pub trait BlockIdentifier: Clone + Hash + Debug + Eq { /// The block number, useful when reasoning about hopeless forks. diff --git a/finality-aleph/src/sync/task_queue.rs b/finality-aleph/src/sync/task_queue.rs new file mode 100644 index 0000000000..8f28521a5e --- /dev/null +++ b/finality-aleph/src/sync/task_queue.rs @@ -0,0 +1,118 @@ +use std::{ + cmp::Ordering, + collections::BinaryHeap, + fmt::{Debug, Formatter}, +}; + +use log::warn; +use tokio::time::{sleep, Duration, Instant}; + +use crate::sync::LOG_TARGET; + +#[derive(Clone)] +struct ScheduledTask { + task: T, + scheduled_time: Instant, +} + +impl Eq for ScheduledTask {} + +impl PartialEq for ScheduledTask { + fn eq(&self, other: &Self) -> bool { + other.scheduled_time.eq(&self.scheduled_time) + } +} + +impl PartialOrd for ScheduledTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ScheduledTask { + /// Compare tasks so that earlier times come first in a max-heap. + fn cmp(&self, other: &Self) -> Ordering { + other.scheduled_time.cmp(&self.scheduled_time) + } +} + +#[derive(Clone, Default)] +pub struct TaskQueue { + queue: BinaryHeap>, +} + +impl Debug for TaskQueue { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TaskQueue") + .field("task count", &self.queue.len()) + .finish() + } +} + +/// Implements a queue allowing for scheduling tasks for some time in the future. +/// +/// Does not actually execute any tasks, is used for ordering in time only. +impl TaskQueue { + /// Creates an empty queue. + pub fn new() -> Self { + Self { + queue: BinaryHeap::new(), + } + } + + /// Schedules `task` for after `delay`. + pub fn schedule_in(&mut self, task: T, delay: Duration) { + let scheduled_time = match Instant::now().checked_add(delay) { + Some(time) => time, + None => { + warn!( + target: LOG_TARGET, + "Could not schedule task in {:?}. Instant out of bound.", delay + ); + return; + } + }; + self.queue.push(ScheduledTask { + task, + scheduled_time, + }); + } + + /// Awaits for the first and most overdue task and returns it. Returns `None` if there are no tasks. + pub async fn pop(&mut self) -> Option { + let scheduled_task = self.queue.peek()?; + + let duration = scheduled_task + .scheduled_time + .saturating_duration_since(Instant::now()); + if !duration.is_zero() { + sleep(duration).await; + } + self.queue.pop().map(|t| t.task) + } +} + +#[cfg(test)] +mod tests { + use tokio::time::{timeout, Duration}; + + use super::TaskQueue; + + #[tokio::test] + async fn test_scheduling() { + let mut q = TaskQueue::new(); + q.schedule_in(2, Duration::from_millis(50)); + q.schedule_in(1, Duration::from_millis(20)); + + assert!(timeout(Duration::from_millis(5), q.pop()).await.is_err()); + assert_eq!( + timeout(Duration::from_millis(20), q.pop()).await, + Ok(Some(1)) + ); + assert!(timeout(Duration::from_millis(10), q.pop()).await.is_err()); + assert_eq!( + timeout(Duration::from_millis(50), q.pop()).await, + Ok(Some(2)) + ); + } +}