-
Notifications
You must be signed in to change notification settings - Fork 992
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor SyncState #3297
Refactor SyncState #3297
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,14 +15,13 @@ | |
//! Base types that the block chain pipeline requires. | ||
|
||
use chrono::prelude::{DateTime, Utc}; | ||
use std::sync::Arc; | ||
|
||
use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; | ||
use crate::core::core::{Block, BlockHeader, HeaderVersion}; | ||
use crate::core::pow::Difficulty; | ||
use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer}; | ||
use crate::error::{Error, ErrorKind}; | ||
use crate::util::RwLock; | ||
use crate::util::{RwLock, RwLockWriteGuard}; | ||
|
||
bitflags! { | ||
/// Options for block validation | ||
|
@@ -40,7 +39,6 @@ bitflags! { | |
|
||
/// Various status sync can be in, whether it's fast sync or archival. | ||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)] | ||
#[allow(missing_docs)] | ||
pub enum SyncStatus { | ||
/// Initial State (we do not yet know if we are/should be syncing) | ||
Initial, | ||
|
@@ -51,28 +49,27 @@ pub enum SyncStatus { | |
AwaitingPeers(bool), | ||
/// Downloading block headers | ||
HeaderSync { | ||
/// current node height | ||
current_height: u64, | ||
/// height of the most advanced peer | ||
highest_height: u64, | ||
}, | ||
/// Downloading the various txhashsets | ||
TxHashsetDownload { | ||
start_time: DateTime<Utc>, | ||
prev_update_time: DateTime<Utc>, | ||
update_time: DateTime<Utc>, | ||
prev_downloaded_size: u64, | ||
downloaded_size: u64, | ||
total_size: u64, | ||
}, | ||
TxHashsetDownload(TxHashsetDownloadStats), | ||
/// Setting up before validation | ||
TxHashsetSetup, | ||
/// Validating the kernels | ||
TxHashsetKernelsValidation { | ||
/// kernels validated | ||
kernels: u64, | ||
/// kernels in total | ||
kernels_total: u64, | ||
}, | ||
/// Validating the range proofs | ||
TxHashsetRangeProofsValidation { | ||
/// range proofs validated | ||
rproofs: u64, | ||
/// range proofs in total | ||
rproofs_total: u64, | ||
}, | ||
/// Finalizing the new state | ||
|
@@ -81,24 +78,57 @@ pub enum SyncStatus { | |
TxHashsetDone, | ||
/// Downloading blocks | ||
BodySync { | ||
/// current node height | ||
current_height: u64, | ||
/// height of the most advanced peer | ||
highest_height: u64, | ||
}, | ||
/// Shutdown | ||
Shutdown, | ||
} | ||
|
||
/// Stats for TxHashsetDownload stage | ||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)] | ||
pub struct TxHashsetDownloadStats { | ||
/// when download started | ||
pub start_time: DateTime<Utc>, | ||
/// time of the previous update | ||
pub prev_update_time: DateTime<Utc>, | ||
/// time of the latest update | ||
pub update_time: DateTime<Utc>, | ||
/// size of the previous chunk | ||
pub prev_downloaded_size: u64, | ||
/// size of the the latest chunk | ||
pub downloaded_size: u64, | ||
/// downloaded since the start | ||
pub total_size: u64, | ||
} | ||
|
||
impl Default for TxHashsetDownloadStats { | ||
fn default() -> Self { | ||
TxHashsetDownloadStats { | ||
start_time: Utc::now(), | ||
update_time: Utc::now(), | ||
prev_update_time: Utc::now(), | ||
prev_downloaded_size: 0, | ||
downloaded_size: 0, | ||
total_size: 0, | ||
} | ||
} | ||
} | ||
|
||
/// Current sync state. Encapsulates the current SyncStatus. | ||
pub struct SyncState { | ||
current: RwLock<SyncStatus>, | ||
sync_error: Arc<RwLock<Option<Error>>>, | ||
sync_error: RwLock<Option<Error>>, | ||
} | ||
|
||
impl SyncState { | ||
/// Return a new SyncState initialize to NoSync | ||
pub fn new() -> SyncState { | ||
SyncState { | ||
current: RwLock::new(SyncStatus::Initial), | ||
sync_error: Arc::new(RwLock::new(None)), | ||
sync_error: RwLock::new(None), | ||
} | ||
} | ||
|
||
|
@@ -114,37 +144,54 @@ impl SyncState { | |
} | ||
|
||
/// Update the syncing status | ||
pub fn update(&self, new_status: SyncStatus) { | ||
if self.status() == new_status { | ||
return; | ||
} | ||
pub fn update(&self, new_status: SyncStatus) -> bool { | ||
let status = self.current.write(); | ||
self.update_with_guard(new_status, status) | ||
} | ||
|
||
let mut status = self.current.write(); | ||
fn update_with_guard( | ||
&self, | ||
new_status: SyncStatus, | ||
mut status: RwLockWriteGuard<SyncStatus>, | ||
) -> bool { | ||
if *status == new_status { | ||
return false; | ||
} | ||
|
||
debug!("sync_state: sync_status: {:?} -> {:?}", *status, new_status,); | ||
|
||
*status = new_status; | ||
true | ||
} | ||
|
||
/// Update txhashset downloading progress | ||
pub fn update_txhashset_download(&self, new_status: SyncStatus) -> bool { | ||
if let SyncStatus::TxHashsetDownload { .. } = new_status { | ||
let mut status = self.current.write(); | ||
*status = new_status; | ||
true | ||
/// Update the syncing status if predicate f is satisfied | ||
pub fn update_if<F>(&self, new_status: SyncStatus, f: F) -> bool | ||
where | ||
F: Fn(SyncStatus) -> bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's a bit awkward to have predicate here, but some variants of |
||
{ | ||
let status = self.current.write(); | ||
if f(*status) { | ||
self.update_with_guard(new_status, status) | ||
} else { | ||
false | ||
} | ||
} | ||
|
||
/// Update txhashset downloading progress | ||
pub fn update_txhashset_download(&self, stats: TxHashsetDownloadStats) { | ||
*self.current.write() = SyncStatus::TxHashsetDownload(stats); | ||
} | ||
|
||
/// Communicate sync error | ||
pub fn set_sync_error(&self, error: Error) { | ||
*self.sync_error.write() = Some(error); | ||
} | ||
|
||
/// Get sync error | ||
pub fn sync_error(&self) -> Arc<RwLock<Option<Error>>> { | ||
Arc::clone(&self.sync_error) | ||
pub fn sync_error(&self) -> Option<String> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need Error per se, just the fact of its existence and description, returning String makes life much easier There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 yeah that makes sense. |
||
self.sync_error | ||
.read() | ||
.as_ref() | ||
.and_then(|e| Some(e.to_string())) | ||
} | ||
|
||
/// Clear sync error | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is not great, we take a read lock, compare and then release the lock, so when we obtain a write lock the state may be different already.