Skip to content

Commit

Permalink
Optimize and fix issues at PIBD cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
bayk committed Aug 16, 2024
1 parent e512bf3 commit 9bf49cf
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 27 deletions.
7 changes: 2 additions & 5 deletions chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,17 @@ pub const RANGEPROOF_SEGMENT_HEIGHT: u8 = 11;
/// Kernel segment height assumed for requests and segment calculation
pub const KERNEL_SEGMENT_HEIGHT: u8 = 11;

/// Maximum number of received segments to cache (across all trees) before we stop requesting others
pub const MAX_CACHED_SEGMENTS: usize = 15;

/// How long the state sync should wait after requesting a segment from a peer before
/// deciding the segment isn't going to arrive. The syncer will then re-request the segment
pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60;

/// Number of simultaneous requests for segments we should make per available peer. Note this is currently
/// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments
/// will always be requested first)
pub const SEGMENT_REQUEST_PER_PEER: usize = 3;
pub const SEGMENT_REQUEST_PER_PEER: usize = 6;
/// Maximum number of simultaneous requests. Please note, the data will be processed in a single thread, so
/// the throughput will not be high. 12 should load CPU pretty well at the end of sync process.
pub const SEGMENT_REQUEST_LIMIT: usize = 12;
pub const SEGMENT_REQUEST_LIMIT: usize = 24;

/// Maximum stale requests per peer. If there are more requests, no new data will be requested
pub const STALE_REQUESTS_PER_PEER: u32 = 5;
Expand Down
63 changes: 43 additions & 20 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ pub struct Desegmenter {
bitmap_mmr_leaf_count: u64,
bitmap_mmr_size: u64,

/// Maximum number of segments to cache before we stop requesting others
max_cached_segments: usize,

/// In-memory 'raw' bitmap corresponding to contents of bitmap accumulator
bitmap_cache: Option<Bitmap>,

Expand Down Expand Up @@ -107,8 +104,6 @@ impl Desegmenter {
bitmap_mmr_leaf_count: 0,
bitmap_mmr_size: 0,

max_cached_segments: pibd_params::MAX_CACHED_SEGMENTS,

bitmap_cache: None,

all_segments_complete: false,
Expand Down Expand Up @@ -409,7 +404,7 @@ impl Desegmenter {
self.apply_output_segment(idx)?;
}
} else {
if self.output_segment_cache.len() >= self.max_cached_segments {
if !self.output_segment_cache.is_empty() {
self.output_segment_cache = vec![];
}
}
Expand All @@ -424,7 +419,7 @@ impl Desegmenter {
self.apply_rangeproof_segment(idx)?;
}
} else {
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
if !self.rangeproof_segment_cache.is_empty() {
self.rangeproof_segment_cache = vec![];
}
}
Expand All @@ -439,7 +434,7 @@ impl Desegmenter {
self.apply_kernel_segment(idx)?;
}
} else {
if self.kernel_segment_cache.len() >= self.max_cached_segments {
if !self.kernel_segment_cache.is_empty() {
self.kernel_segment_cache = vec![];
}
}
Expand All @@ -464,13 +459,21 @@ impl Desegmenter {
self.default_bitmap_segment_height,
);
// Advance iterator to next expected segment
let mut first_found = true;
let mut processed = 0;
while let Some(id) = identifier_iter.next() {
if id.segment_pos_range(self.bitmap_mmr_size).1 > local_pmmr_size {
if first_found {
self.bitmap_segment_cache
.retain(|segm| segm.id().idx >= id.idx);
first_found = false;
}
if !self.has_bitmap_segment_with_id(id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Bitmap, id));
if return_vec.len() >= max_elements {
return Ok(return_vec);
}
}
processed += 1;
if processed > max_elements {
return Ok(return_vec);
}
}
}
Expand All @@ -494,21 +497,28 @@ impl Desegmenter {
);

let mut elems_added = 0;
let mut first_found = true;
let mut processed = 0;
while let Some(output_id) = output_identifier_iter.next() {
// Advance output iterator to next needed position
let (_first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if last <= local_output_mmr_size {
continue;
}
if self.output_segment_cache.len() >= self.max_cached_segments {
break;

if first_found {
// Let's clean up old expired items
self.output_segment_cache
.retain(|segm| segm.id().idx >= output_id.idx);
first_found = false;
}
if !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
elems_added += 1;
}
if elems_added == max_elements / 3 {
processed += 1;
if processed > max_elements || elems_added == max_elements / 3 {
break;
}
}
Expand All @@ -519,20 +529,26 @@ impl Desegmenter {
);

elems_added = 0;
let mut first_found = true;
let mut processed = 0;
while let Some(rp_id) = rangeproof_identifier_iter.next() {
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
// Advance rangeproof iterator to next needed position
if last <= local_rangeproof_mmr_size {
continue;
}
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
break;
if first_found {
// Let's clean up old expired items
self.rangeproof_segment_cache
.retain(|segm| segm.id().idx >= rp_id.idx);
first_found = false;
}
if !self.has_rangeproof_segment_with_id(rp_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
elems_added += 1;
}
if elems_added == max_elements / 3 {
processed += 1;
if processed > max_elements || elems_added == max_elements / 3 {
break;
}
}
Expand All @@ -543,21 +559,28 @@ impl Desegmenter {
);

elems_added = 0;
let mut first_found = true;
let mut processed = 0;
while let Some(k_id) = kernel_identifier_iter.next() {
// Advance kernel iterator to next needed position
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
// Advance rangeproof iterator to next needed position
if last <= local_kernel_mmr_size {
continue;
}
if self.kernel_segment_cache.len() >= self.max_cached_segments {
break;
if first_found {
// Let's clean up old expired items
self.kernel_segment_cache
.retain(|segm| segm.id().idx >= k_id.idx);
first_found = false;
}

if !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
if elems_added == max_elements / 3 {
processed += 1;
if processed > max_elements || elems_added == max_elements / 3 {
break;
}
}
Expand Down
5 changes: 5 additions & 0 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use grin_p2p::ReasonForBan;
use grin_util::secp::rand::Rng;
use rand::seq::SliceRandom;
use std::sync::Arc;
use std::{thread, time};

use crate::chain::{self, pibd_params, SyncState, SyncStatus};
use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType};
Expand Down Expand Up @@ -246,6 +247,10 @@ impl StateSync {
}

if has_segmenter {
// Sleeping some extra time because checking request is CPU time consuming, not much optimization
// going through all the data. That is why at lease let's not over do with that.
thread::sleep(time::Duration::from_millis(500));

// Continue our PIBD process (which returns true if all segments are in)
match self.continue_pibd(&archive_header) {
Ok(true) => {
Expand Down
4 changes: 2 additions & 2 deletions servers/src/grin/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl SyncRunner {
if self.stop_state.is_stopped() {
break;
}

thread::sleep(time::Duration::from_millis(10));
// Grin has 10 ms here. During PIBD sync it is waste of CPU, checking will take significant resources.
thread::sleep(time::Duration::from_millis(500));

let currently_syncing = self.sync_state.is_syncing();

Expand Down

0 comments on commit 9bf49cf

Please sign in to comment.