Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <[email protected]>
  • Loading branch information
eval-exec committed Dec 14, 2024
1 parent a12cd96 commit 397cdcb
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
26 changes: 21 additions & 5 deletions sync/src/synchronizer/get_headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,33 @@ impl<'a> GetHeadersProcess<'a> {
);

self.synchronizer.peers().getheaders_received(self.peer);
let headers_vec: Vec<Vec<core::HeaderView>> =
active_chain.get_locator_responses(block_number, &hash_stop);
// response headers

debug!("headers len={}", headers_vec.len());
for headers in headers_vec {
let length_20_for_test =
packed::Byte32::new_unchecked(packed::Uint32::from(20).as_bytes());
if hash_stop.eq(length_20_for_test) {
let headers: Vec<core::HeaderView> =
active_chain.get_locator_response(block_number, &hash_stop);
// response headers

debug!("headers len={}", headers_vec.len());
let content = packed::SendHeaders::new_builder()
.headers(headers.into_iter().map(|x| x.data()).pack())
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
attempt!(send_message_to(self.nc, self.peer, &message));
} else {
let headers_vec: Vec<Vec<core::HeaderView>> =
active_chain.get_locator_responses(block_number, &hash_stop);
// response headers

debug!("headers vec len={}", headers_vec.len());
for headers in headers_vec {
let content = packed::SendHeaders::new_builder()
.headers(headers.into_iter().map(|x| x.data()).pack())
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
attempt!(send_message_to(self.nc, self.peer, &message));
}
}
} else {
return StatusCode::GetHeadersMissCommonAncestors
Expand Down
16 changes: 10 additions & 6 deletions sync/src/synchronizer/headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<'a> HeadersProcess<'a> {

fn is_parent_exists(&self, first_header: &core:HeaderView) -> bool {
let shared: &SyncShared = self.synchronizer.shared();
shared.get_header_fields(first_header.parent_hash).is_some()
shared.get_header_fields(first_header.parent_hash()).is_some()
}


Expand Down Expand Up @@ -98,15 +98,19 @@ impl<'a> HeadersProcess<'a> {

pub fn execute(self) -> Status {
debug!("HeadersProcess begins");
let shared: &SyncShared = self.synchronizer.shared();
let consensus = shared.consensus();
let headers = self
.message
.headers()
.to_entity()
.into_iter()
.map(packed::Header::into_view)
.collect::<Vec<_>>();
self.execute_inner(headers)
}

fn execute_inner(self, headers: Vec<core::HeaderView>) -> Status {
let shared: &SyncShared = self.synchronizer.shared();
let consensus = shared.consensus();

if headers.len() > MAX_HEADERS_LEN {
warn!("HeadersProcess is oversized");
Expand Down Expand Up @@ -136,7 +140,7 @@ impl<'a> HeadersProcess<'a> {

if !self.is_parent_exists(&headers[0]) {
// put the headers into a memory cache
self.synchronizer.header_cache.insert(headers[0].parent_hash, headers);
self.synchronizer.header_cache.insert(headers[0].parent_hash(), headers);
// verify them later
return Status::ok();
}
Expand Down Expand Up @@ -226,8 +230,8 @@ impl<'a> HeadersProcess<'a> {
{
// these headers verify success
// may the headers's tail header_hash exist in headers_cahce?
if let Some(headers) = self.synchronizer.headers_cache.get(headers.last().expect("last header must exist").hash){
HeadersProcess::new().execute();
if let Some(headers) = self.synchronizer.header_cache.get(headers.last().expect("last header must exist").hash){
return self.execute_inner(headers);
}
}

Expand Down
4 changes: 4 additions & 0 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ use ckb_systemtime::unix_time_as_millis;

#[cfg(test)]
use ckb_types::core;
use ckb_types::packed::Header;
use ckb_types::{
core::BlockNumber,
packed::{self, Byte32},
prelude::*,
};
use std::collections::HashMap;
use std::{
collections::HashSet,
sync::{atomic::Ordering, Arc},
Expand Down Expand Up @@ -312,10 +314,12 @@ impl Synchronizer {
///
/// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it
pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
let header_cache = HashMap::new();
Synchronizer {
chain,
shared,
fetch_channel: None,
header_cache,
}
}

Expand Down
12 changes: 8 additions & 4 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1914,9 +1914,12 @@ impl ActiveChain {
block_number: BlockNumber,
hash_stop: &Byte32,
) -> Vec<Vec<core::HeaderView>> {
(0..32).iter().map(|index| {
get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default())
}).collect();
(0..32)
.iter()
.map(|index| {
get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default())
})
.collect();
}

pub fn send_getheaders_to_peer(
Expand Down Expand Up @@ -1955,9 +1958,10 @@ impl ActiveChain {
block_number_and_hash.hash()
);
let locator_hash = self.get_locator(block_number_and_hash);
let length_20_for_test = packed::Byte32::new_unchecked(packed::Uint32::from(20).as_bytes());
let content = packed::GetHeaders::new_builder()
.block_locator_hashes(locator_hash.pack())
.hash_stop(packed::Byte32::zero())
.hash_stop(length_20_for_test)
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);
Expand Down

0 comments on commit 397cdcb

Please sign in to comment.