Skip to content

Commit

Permalink
[PIBD_IMPL] Update number of simultaneous peer requests for segments (#…
Browse files Browse the repository at this point in the history
…3696)

* cleanup of segment request list

* allow for more simultaneous requests during state sync

* up number of simultaneous peer requests for segments
  • Loading branch information
yeastplume authored Feb 24, 2022
1 parent 5630cf2 commit bf48e52
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 67 deletions.
34 changes: 14 additions & 20 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,8 @@ impl Desegmenter {
}
} else {
// We have all required bitmap segments and have recreated our local
// bitmap, now continue with other segments
// TODO: Outputs only for now, just for testing. we'll want to evenly spread
// requests among the 3 PMMRs
// bitmap, now continue with other segments, evenly spreading requests
// among MMRs
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
Expand All @@ -485,21 +484,19 @@ impl Desegmenter {
self.default_output_segment_height,
);

let mut elems_added = 0;
while let Some(output_id) = output_identifier_iter.next() {
// Advance output iterator to next needed position
let (_first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if last <= local_output_mmr_size {
continue;
}
// Break if we're full
if return_vec.len() > max_elements {
break;
}

if !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
// Let other trees have a chance to put in a segment request
elems_added += 1;
}
if elems_added == max_elements / 3 {
break;
}
}
Expand All @@ -509,20 +506,18 @@ impl Desegmenter {
self.default_rangeproof_segment_height,
);

elems_added = 0;
while let Some(rp_id) = rangeproof_identifier_iter.next() {
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
// Advance rangeproof iterator to next needed position
if last <= local_rangeproof_mmr_size {
continue;
}
// Break if we're full
if return_vec.len() > max_elements {
break;
}

if !self.has_rangeproof_segment_with_id(rp_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
// Let other trees have a chance to put in a segment request
elems_added += 1;
}
if elems_added == max_elements / 3 {
break;
}
}
Expand All @@ -532,20 +527,19 @@ impl Desegmenter {
self.default_kernel_segment_height,
);

elems_added = 0;
while let Some(k_id) = kernel_identifier_iter.next() {
// Advance kernel iterator to next needed position
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
// Advance rangeproof iterator to next needed position
if last <= local_kernel_mmr_size {
continue;
}
// Break if we're full
if return_vec.len() > max_elements {
break;
}

if !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
if elems_added == max_elements / 3 {
break;
}
}
Expand Down
52 changes: 32 additions & 20 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,16 +577,11 @@ where
block_hash,
output_root
);
// Remove segment from outgoing list TODO: Where is the best place to
// do this?
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Bitmap,
identifier: segment.identifier(),
});
// TODO: Entire process needs to be restarted if the horizon block
// has changed (perhaps not here, NB this has to go somewhere)
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
Expand All @@ -599,10 +594,15 @@ where
"Validation of incoming bitmap segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Bitmap,
identifier,
});
retval
}

fn receive_output_segment(
Expand All @@ -617,14 +617,9 @@ where
block_hash,
bitmap_root,
);
// Remove segment from outgoing list TODO: Where is the best place to
// do this?
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Output,
identifier: segment.identifier(),
});
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
Expand All @@ -637,10 +632,15 @@ where
"Validation of incoming output segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Output,
identifier,
});
retval
}

fn receive_rangeproof_segment(
Expand All @@ -655,6 +655,7 @@ where
);
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
Expand All @@ -667,10 +668,15 @@ where
"Validation of incoming rangeproof segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::RangeProof,
identifier,
});
retval
}

fn receive_kernel_segment(
Expand All @@ -685,6 +691,7 @@ where
);
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let identifier = segment.identifier().clone();
let mut retval = Ok(true);
if let Some(d) = self
.chain()
.desegmenter(&archive_header, self.sync_state.clone())?
Expand All @@ -697,10 +704,15 @@ where
"Validation of incoming rangeproof segment failed: {:?}, reason: {}",
identifier, e
);
return Err(e);
retval = Err(e);
}
}
Ok(true)
// Remove segment from outgoing list
self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier {
segment_type: SegmentType::Kernel,
identifier,
});
retval
}
}

Expand Down
53 changes: 26 additions & 27 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl StateSync {
// Figure out the next segments we need
// (12 is divisible by 3, to try and evenly spread the requests among the 3
// main pmmrs. Bitmaps segments will always be requested first)
next_segment_ids = d.next_desired_segments(12);
next_segment_ids = d.next_desired_segments(15);
}

// For each segment, pick a desirable peer and send message
Expand Down Expand Up @@ -271,34 +271,33 @@ impl StateSync {
});
trace!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
match seg_id.segment_type {
SegmentType::Bitmap => p
.send_bitmap_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::Output => p
.send_output_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::RangeProof => p
.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
SegmentType::Kernel => p
.send_kernel_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
)
.unwrap(),
};
// add to list of segments that are being tracked
self.sync_state.add_pibd_segment(seg_id);
let res = match seg_id.segment_type {
SegmentType::Bitmap => p.send_bitmap_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Output => p.send_output_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::RangeProof => p.send_rangeproof_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
SegmentType::Kernel => p.send_kernel_segment_request(
archive_header.hash(),
seg_id.identifier.clone(),
),
};
if let Err(e) = res {
info!(
"Error sending request to peer at {}, reason: {:?}",
p.info.addr, e
);
self.sync_state.remove_pibd_segment(seg_id);
}
}
}
false
Expand Down

0 comments on commit bf48e52

Please sign in to comment.