Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIBD_IMPL] Introduce PIBD state into sync workflow #3685

Merged
merged 6 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,17 @@ impl Chain {
self.get_header_by_height(txhashset_height)
}

/// Return the Block Header at the txhashset horizon, considering only the
/// contents of the header PMMR
pub fn txhashset_archive_header_header_only(&self) -> Result<BlockHeader, Error> {
let header_head = self.header_head()?;
let threshold = global::state_sync_threshold() as u64;
let archive_interval = global::txhashset_archive_interval();
let mut txhashset_height = header_head.height.saturating_sub(threshold);
txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval);
self.get_header_by_height(txhashset_height)
}

// Special handling to make sure the whole kernel set matches each of its
// roots in each block header, without truncation. We go back header by
// header, rewind and check each root. This fixes a potential weakness in
Expand Down
7 changes: 4 additions & 3 deletions chain/src/txhashset/bitmap_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,11 @@ impl BitmapAccumulator {
/// Return a raw in-memory bitmap of this accumulator
pub fn as_bitmap(&self) -> Result<Bitmap, Error> {
let mut bitmap = Bitmap::create();
for (chunk_count, chunk_index) in self.backend.leaf_idx_iter(0).enumerate() {
for (chunk_index, chunk_pos) in self.backend.leaf_pos_iter().enumerate() {
//TODO: Unwrap
let chunk = self.backend.get_data(chunk_index).unwrap();
bitmap.add_many(&chunk.set_iter(chunk_count * 1024).collect::<Vec<u32>>());
let chunk = self.backend.get_data(chunk_pos as u64).unwrap();
let additive = chunk.set_iter(chunk_index * 1024).collect::<Vec<u32>>();
bitmap.add_many(&additive);
}
Ok(bitmap)
}
Expand Down
20 changes: 16 additions & 4 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ use crate::txhashset;

use croaring::Bitmap;

/// States that the desegmenter can be in, to keep track of what
/// parts are needed next in the proces
pub enum DesegmenterState {}

/// Desegmenter for rebuilding a txhashset from PIBD segments
#[derive(Clone)]
pub struct Desegmenter {
Expand Down Expand Up @@ -129,10 +133,18 @@ impl Desegmenter {
self.bitmap_mmr_leaf_count
);
// Total size of Bitmap PMMR
self.bitmap_mmr_size = pmmr::peaks(self.bitmap_mmr_leaf_count)
.last()
.unwrap_or(&pmmr::insertion_to_pmmr_index(self.bitmap_mmr_leaf_count))
.clone();
self.bitmap_mmr_size =
1 + pmmr::peaks(pmmr::insertion_to_pmmr_index(self.bitmap_mmr_leaf_count))
.last()
.unwrap_or(
&(pmmr::peaks(pmmr::insertion_to_pmmr_index(
self.bitmap_mmr_leaf_count - 1,
))
.last()
.unwrap()),
)
.clone();

debug!(
"pibd_desgmenter - expected size of bitmap MMR: {}",
self.bitmap_mmr_size
Expand Down
8 changes: 8 additions & 0 deletions chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ pub enum SyncStatus {
/// diff of the most advanced peer
highest_diff: Difficulty,
},
/// Performing PIBD reconstruction of txhashset
/// If PIBD syncer determines there's not enough
/// PIBD peers to continue, then move on to TxHashsetDownload state
TxHashsetPibd {
/// Whether the syncer has determined there's not enough
/// data to continue via PIBD
aborted: bool,
},
/// Downloading the various txhashsets
TxHashsetDownload(TxHashsetDownloadStats),
/// Setting up before validation
Expand Down
28 changes: 27 additions & 1 deletion p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global};
use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Msg, Ping, TxHashSetRequest, Type};
use crate::msg::{
self, BanReason, GetPeerAddrs, Locator, Msg, Ping, SegmentRequest, TxHashSetRequest, Type,
};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
Expand Down Expand Up @@ -371,6 +373,20 @@ impl Peer {
)
}

pub fn send_bitmap_segment_request(
&self,
h: Hash,
identifier: SegmentIdentifier,
) -> Result<(), Error> {
self.send(
&SegmentRequest {
block_hash: h,
identifier,
},
msg::Type::GetOutputBitmapSegment,
)
}

/// Stops the peer
pub fn stop(&self) {
debug!("Stopping peer {:?}", self.info.addr);
Expand Down Expand Up @@ -586,6 +602,16 @@ impl ChainAdapter for TrackingAdapter {
) -> Result<Segment<RangeProof>, chain::Error> {
self.adapter.get_rangeproof_segment(hash, id)
}

fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
self.adapter
.receive_bitmap_segment(block_hash, output_root, segment)
}
}

impl NetAdapter for TrackingAdapter {
Expand Down
10 changes: 10 additions & 0 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,16 @@ impl ChainAdapter for Peers {
) -> Result<Segment<RangeProof>, chain::Error> {
self.adapter.get_rangeproof_segment(hash, id)
}

fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
self.adapter
.receive_bitmap_segment(block_hash, output_root, segment)
}
}

impl NetAdapter for Peers {
Expand Down
16 changes: 14 additions & 2 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,20 @@ impl MessageHandler for Protocol {
Consumed::None
}
}
Message::OutputBitmapSegment(_)
| Message::OutputSegment(_)
Message::OutputBitmapSegment(req) => {
let OutputBitmapSegmentResponse {
block_hash,
segment,
output_root,
} = req;
debug!(
"Received Output Bitmap Segment: bh, output_root: {}, {}",
block_hash, output_root
);
adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?;
Consumed::None
}
Message::OutputSegment(_)
| Message::RangeProofSegment(_)
| Message::KernelSegment(_) => Consumed::None,

Expand Down
9 changes: 9 additions & 0 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,15 @@ impl ChainAdapter for DummyAdapter {
) -> Result<Segment<RangeProof>, chain::Error> {
unimplemented!()
}

fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
unimplemented!()
}
}

impl NetAdapter for DummyAdapter {
Expand Down
7 changes: 7 additions & 0 deletions p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,13 @@ pub trait ChainAdapter: Sync + Send {
hash: Hash,
id: SegmentIdentifier,
) -> Result<Segment<RangeProof>, chain::Error>;

fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error>;
}

/// Additional methods required by the protocol that don't need to be
Expand Down
18 changes: 18 additions & 0 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,24 @@ where
}
segmenter.rangeproof_segment(id)
}

fn receive_bitmap_segment(
&self,
block_hash: Hash,
output_root: Hash,
segment: Segment<BitmapChunk>,
) -> Result<bool, chain::Error> {
debug!(
"RECEIVED BITMAP SEGMENT FOR block_hash: {}, output_root: {}",
block_hash, output_root
);
// TODO: Entire process needs to be restarted if the horizon block
// has changed (perhaps not here, NB for somewhere)
let archive_header = self.chain().txhashset_archive_header_header_only()?;
let mut desegmenter = self.chain().desegmenter(&archive_header)?;
desegmenter.add_bitmap_segment(segment, output_root)?;
Ok(true)
}
}

impl<B, P> NetToChainAdapter<B, P>
Expand Down
127 changes: 99 additions & 28 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use chrono::Duration;
use std::sync::Arc;

use crate::chain::{self, SyncState, SyncStatus};
use crate::core::core::hash::Hashed;
use crate::core::core::{hash::Hashed, pmmr::segment::SegmentIdentifier};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::p2p::{self, Capabilities, Peer};
Expand All @@ -35,6 +35,8 @@ pub struct StateSync {

prev_state_sync: Option<DateTime<Utc>>,
state_sync_peer: Option<Arc<Peer>>,

sent_test_pibd_message: bool,
}

impl StateSync {
Expand All @@ -49,6 +51,7 @@ impl StateSync {
chain,
prev_state_sync: None,
state_sync_peer: None,
sent_test_pibd_message: false,
}
}

Expand All @@ -74,15 +77,31 @@ impl StateSync {
sync_need_restart = true;
}

// Determine whether we're going to try using PIBD or whether we've already given up
// on it
let using_pibd =
if let SyncStatus::TxHashsetPibd { aborted: true, .. } = self.sync_state.status() {
false
} else {
// Only on testing chains for now
if global::get_chain_type() != global::ChainTypes::Mainnet {
true
} else {
false
}
};

// check peer connection status of this sync
if let Some(ref peer) = self.state_sync_peer {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() {
sync_need_restart = true;
info!(
"state_sync: peer connection lost: {:?}. restart",
peer.info.addr,
);
if !using_pibd {
if let Some(ref peer) = self.state_sync_peer {
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if !peer.is_connected() {
sync_need_restart = true;
info!(
"state_sync: peer connection lost: {:?}. restart",
peer.info.addr,
);
}
}
}
}
Expand Down Expand Up @@ -111,35 +130,87 @@ impl StateSync {

// run fast sync if applicable, normally only run one-time, except restart in error
if sync_need_restart || header_head.height == highest_height {
let (go, download_timeout) = self.state_sync_due();

if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout {
error!("state_sync: TxHashsetDownload status timeout in 10 minutes!");
self.sync_state.set_sync_error(
chain::ErrorKind::SyncError(format!("{:?}", p2p::Error::Timeout)).into(),
);
if using_pibd {
let (launch, _download_timeout) = self.state_sync_due();
if launch {
self.sync_state
.update(SyncStatus::TxHashsetPibd { aborted: false });
}
}
// Continue our PIBD process
self.continue_pibd();
} else {
let (go, download_timeout) = self.state_sync_due();

if go {
self.state_sync_peer = None;
match self.request_state(&header_head) {
Ok(peer) => {
self.state_sync_peer = Some(peer);
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
if download_timeout {
error!("state_sync: TxHashsetDownload status timeout in 10 minutes!");
self.sync_state.set_sync_error(
chain::ErrorKind::SyncError(format!("{:?}", p2p::Error::Timeout))
.into(),
);
}
Err(e) => self
.sync_state
.set_sync_error(chain::ErrorKind::SyncError(format!("{:?}", e)).into()),
}

self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
if go {
self.state_sync_peer = None;
match self.request_state(&header_head) {
Ok(peer) => {
self.state_sync_peer = Some(peer);
}
Err(e) => self
.sync_state
.set_sync_error(chain::ErrorKind::SyncError(format!("{:?}", e)).into()),
}

self.sync_state
.update(SyncStatus::TxHashsetDownload(Default::default()));
}
}
}
true
}

fn continue_pibd(&mut self) {
// Check the state of our chain to figure out what we should be requesting next
// TODO: Just faking a single request for testing
if !self.sent_test_pibd_message {
debug!("Sending test PIBD message");
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();

let target_segment_height = 11;
//let archive_header = self.chain.txhashset_archive_header().unwrap();
let desegmenter = self.chain.desegmenter(&archive_header).unwrap();
let bitmap_mmr_size = desegmenter.expected_bitmap_mmr_size();
let mut identifier_iter =
SegmentIdentifier::traversal_iter(bitmap_mmr_size, target_segment_height);

self.sent_test_pibd_message = true;
let peers_iter = || {
self.peers
.iter()
.with_capabilities(Capabilities::PIBD_HIST)
.connected()
};

// Filter peers further based on max difficulty.
let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero());
let peers_iter = || peers_iter().with_difficulty(|x| x >= max_diff);
// Choose a random "most work" peer, preferring outbound if at all possible.
let peer = peers_iter().outbound().choose_random().or_else(|| {
warn!("no suitable outbound peer for pibd message, considering inbound");
peers_iter().inbound().choose_random()
});
debug!("Chosen peer is {:?}", peer);
if let Some(p) = peer {
p.send_bitmap_segment_request(
archive_header.hash(),
identifier_iter.next().unwrap(),
)
.unwrap();
}
}
}

fn request_state(&self, header_head: &chain::Tip) -> Result<Arc<Peer>, p2p::Error> {
let threshold = global::state_sync_threshold() as u64;
let archive_interval = global::txhashset_archive_interval();
Expand Down
Loading