Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SyncState #3297

Merged
merged 2 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions api/src/handlers/server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,11 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option<serde_json::Va
"header_sync".to_string(),
Some(json!({ "current_height": current_height, "highest_height": highest_height })),
),
SyncStatus::TxHashsetDownload {
start_time: _,
prev_update_time: _,
update_time: _,
prev_downloaded_size: _,
downloaded_size,
total_size,
} => (
SyncStatus::TxHashsetDownload(stats) => (
"txhashset_download".to_string(),
Some(json!({ "downloaded_size": downloaded_size, "total_size": total_size })),
Some(
json!({ "downloaded_size": stats.downloaded_size, "total_size": stats.total_size }),
),
),
SyncStatus::TxHashsetRangeProofsValidation {
rproofs,
Expand Down
3 changes: 2 additions & 1 deletion chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ pub use crate::chain::{Chain, MAX_ORPHAN_SIZE};
pub use crate::error::{Error, ErrorKind};
pub use crate::store::ChainStore;
pub use crate::types::{
BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetWriteStatus,
BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetDownloadStats,
TxHashsetWriteStatus,
};
101 changes: 74 additions & 27 deletions chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
//! Base types that the block chain pipeline requires.

use chrono::prelude::{DateTime, Utc};
use std::sync::Arc;

use crate::core::core::hash::{Hash, Hashed, ZERO_HASH};
use crate::core::core::{Block, BlockHeader, HeaderVersion};
use crate::core::pow::Difficulty;
use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer};
use crate::error::{Error, ErrorKind};
use crate::util::RwLock;
use crate::util::{RwLock, RwLockWriteGuard};

bitflags! {
/// Options for block validation
Expand All @@ -40,7 +39,6 @@ bitflags! {

/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
#[allow(missing_docs)]
pub enum SyncStatus {
/// Initial State (we do not yet know if we are/should be syncing)
Initial,
Expand All @@ -51,28 +49,27 @@ pub enum SyncStatus {
AwaitingPeers(bool),
/// Downloading block headers
HeaderSync {
/// current node height
current_height: u64,
/// height of the most advanced peer
highest_height: u64,
},
/// Downloading the various txhashsets
TxHashsetDownload {
start_time: DateTime<Utc>,
prev_update_time: DateTime<Utc>,
update_time: DateTime<Utc>,
prev_downloaded_size: u64,
downloaded_size: u64,
total_size: u64,
},
TxHashsetDownload(TxHashsetDownloadStats),
/// Setting up before validation
TxHashsetSetup,
/// Validating the kernels
TxHashsetKernelsValidation {
/// kernels validated
kernels: u64,
/// kernels in total
kernels_total: u64,
},
/// Validating the range proofs
TxHashsetRangeProofsValidation {
/// range proofs validated
rproofs: u64,
/// range proofs in total
rproofs_total: u64,
},
/// Finalizing the new state
Expand All @@ -81,24 +78,57 @@ pub enum SyncStatus {
TxHashsetDone,
/// Downloading blocks
BodySync {
/// current node height
current_height: u64,
/// height of the most advanced peer
highest_height: u64,
},
/// Shutdown
Shutdown,
}

/// Stats for TxHashsetDownload stage
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
pub struct TxHashsetDownloadStats {
/// when download started
pub start_time: DateTime<Utc>,
/// time of the previous update
pub prev_update_time: DateTime<Utc>,
/// time of the latest update
pub update_time: DateTime<Utc>,
/// size of the previous chunk
pub prev_downloaded_size: u64,
/// size of the the latest chunk
pub downloaded_size: u64,
/// downloaded since the start
pub total_size: u64,
}

impl Default for TxHashsetDownloadStats {
fn default() -> Self {
TxHashsetDownloadStats {
start_time: Utc::now(),
update_time: Utc::now(),
prev_update_time: Utc::now(),
prev_downloaded_size: 0,
downloaded_size: 0,
total_size: 0,
}
}
}

/// Current sync state. Encapsulates the current SyncStatus.
pub struct SyncState {
current: RwLock<SyncStatus>,
sync_error: Arc<RwLock<Option<Error>>>,
sync_error: RwLock<Option<Error>>,
}

impl SyncState {
/// Return a new SyncState initialize to NoSync
pub fn new() -> SyncState {
SyncState {
current: RwLock::new(SyncStatus::Initial),
sync_error: Arc::new(RwLock::new(None)),
sync_error: RwLock::new(None),
}
}

Expand All @@ -114,37 +144,54 @@ impl SyncState {
}

/// Update the syncing status
pub fn update(&self, new_status: SyncStatus) {
if self.status() == new_status {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not great, we take a read lock, compare and then release the lock, so when we obtain a write lock the state may be different already.

return;
}
pub fn update(&self, new_status: SyncStatus) -> bool {
let status = self.current.write();
self.update_with_guard(new_status, status)
}

let mut status = self.current.write();
fn update_with_guard(
&self,
new_status: SyncStatus,
mut status: RwLockWriteGuard<SyncStatus>,
) -> bool {
if *status == new_status {
return false;
}

debug!("sync_state: sync_status: {:?} -> {:?}", *status, new_status,);

*status = new_status;
true
}

/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, new_status: SyncStatus) -> bool {
if let SyncStatus::TxHashsetDownload { .. } = new_status {
let mut status = self.current.write();
*status = new_status;
true
/// Update the syncing status if predicate f is satisfied
pub fn update_if<F>(&self, new_status: SyncStatus, f: F) -> bool
where
F: Fn(SyncStatus) -> bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit awkward to have predicate here, but some variants of SyncStatus have state attached which makes matching painful

{
let status = self.current.write();
if f(*status) {
self.update_with_guard(new_status, status)
} else {
false
}
}

/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, stats: TxHashsetDownloadStats) {
*self.current.write() = SyncStatus::TxHashsetDownload(stats);
}

/// Communicate sync error
pub fn set_sync_error(&self, error: Error) {
*self.sync_error.write() = Some(error);
}

/// Get sync error
pub fn sync_error(&self) -> Arc<RwLock<Option<Error>>> {
Arc::clone(&self.sync_error)
pub fn sync_error(&self) -> Option<String> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need Error per se, just the fact of its existence and description, returning String makes life much easier

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 yeah that makes sense.

self.sync_error
.read()
.as_ref()
.and_then(|e| Some(e.to_string()))
}

/// Clear sync error
Expand Down
30 changes: 15 additions & 15 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::{Arc, Weak};
use std::thread;
use std::time::Instant;

use crate::chain::{self, BlockStatus, ChainAdapter, Options, SyncState, SyncStatus};
use crate::chain::{
self, BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, TxHashsetDownloadStats,
};
use crate::common::hooks::{ChainEvents, NetEvents};
use crate::common::types::{ChainValidationMode, DandelionEpoch, ServerConfig};
use crate::core::core::hash::{Hash, Hashed};
Expand Down Expand Up @@ -399,20 +401,18 @@ impl p2p::ChainAdapter for NetToChainAdapter {
total_size: u64,
) -> bool {
match self.sync_state.status() {
SyncStatus::TxHashsetDownload {
update_time: old_update_time,
downloaded_size: old_downloaded_size,
..
} => self
.sync_state
.update_txhashset_download(SyncStatus::TxHashsetDownload {
start_time,
prev_update_time: old_update_time,
update_time: Utc::now(),
prev_downloaded_size: old_downloaded_size,
downloaded_size,
total_size,
}),
SyncStatus::TxHashsetDownload(prev) => {
self.sync_state
.update_txhashset_download(TxHashsetDownloadStats {
start_time,
prev_update_time: prev.update_time,
update_time: Utc::now(),
prev_downloaded_size: prev.downloaded_size,
downloaded_size,
total_size,
});
true
}
_ => false,
}
}
Expand Down
56 changes: 24 additions & 32 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,9 @@ impl StateSync {
let mut sync_need_restart = false;

// check sync error
{
let clone = self.sync_state.sync_error();
if let Some(ref sync_error) = *clone.read() {
error!("state_sync: error = {:?}. restart fast sync", sync_error);
sync_need_restart = true;
}
drop(clone);
if let Some(sync_error) = self.sync_state.sync_error() {
error!("state_sync: error = {}. restart fast sync", sync_error);
sync_need_restart = true;
}

// check peer connection status of this sync
Expand All @@ -92,15 +88,16 @@ impl StateSync {

// if txhashset downloaded and validated successfully, we switch to BodySync state,
// and we need call state_sync_reset() to make it ready for next possible state sync.
let done = if let SyncStatus::TxHashsetDone = self.sync_state.status() {
self.sync_state.update(SyncStatus::BodySync {
let done = self.sync_state.update_if(
SyncStatus::BodySync {
current_height: 0,
highest_height: 0,
});
true
} else {
false
};
},
|s| match s {
SyncStatus::TxHashsetDone => true,
_ => false,
},
);

if sync_need_restart || done {
self.state_sync_reset();
Expand Down Expand Up @@ -137,24 +134,19 @@ impl StateSync {

// to avoid the confusing log,
// update the final HeaderSync state mainly for 'current_height'
{
let status = self.sync_state.status();
if let SyncStatus::HeaderSync { .. } = status {
self.sync_state.update(SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height,
});
}
}

self.sync_state.update(SyncStatus::TxHashsetDownload {
start_time: Utc::now(),
prev_update_time: Utc::now(),
update_time: Utc::now(),
prev_downloaded_size: 0,
downloaded_size: 0,
total_size: 0,
});
self.sync_state.update_if(
SyncStatus::HeaderSync {
current_height: header_head.height,
highest_height,
},
|s| match s {
SyncStatus::HeaderSync { .. } => true,
_ => false,
},
);

self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
}
}
true
Expand Down
25 changes: 7 additions & 18 deletions src/bin/tui/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,31 +48,20 @@ impl TUIStatusView {
};
format!("Sync step 1/7: Downloading headers: {}%", percent)
}
SyncStatus::TxHashsetDownload {
start_time,
prev_update_time,
update_time: _,
prev_downloaded_size,
downloaded_size,
total_size,
} => {
if total_size > 0 {
let percent = if total_size > 0 {
downloaded_size * 100 / total_size
} else {
0
};
let start = prev_update_time.timestamp_nanos();
SyncStatus::TxHashsetDownload(stat) => {
if stat.total_size > 0 {
let percent = stat.downloaded_size * 100 / stat.total_size;
let start = stat.prev_update_time.timestamp_nanos();
let fin = Utc::now().timestamp_nanos();
let dur_ms = (fin - start) as f64 * NANO_TO_MILLIS;

format!("Sync step 2/7: Downloading {}(MB) chain state for state sync: {}% at {:.1?}(kB/s)",
total_size / 1_000_000,
stat.total_size / 1_000_000,
percent,
if dur_ms > 1.0f64 { downloaded_size.saturating_sub(prev_downloaded_size) as f64 / dur_ms as f64 } else { 0f64 },
if dur_ms > 1.0f64 { stat.downloaded_size.saturating_sub(stat.prev_downloaded_size) as f64 / dur_ms as f64 } else { 0f64 },
)
} else {
let start = start_time.timestamp_millis();
let start = stat.start_time.timestamp_millis();
let fin = Utc::now().timestamp_millis();
let dur_secs = (fin - start) / 1000;

Expand Down
2 changes: 1 addition & 1 deletion util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extern crate lazy_static;
extern crate serde_derive;
// Re-export so only has to be included once
pub use parking_lot::Mutex;
pub use parking_lot::{RwLock, RwLockReadGuard};
pub use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};

// Re-export so only has to be included once
pub use secp256k1zkp as secp;
Expand Down