From 05ec812016ba3eade5096360b1f92cac899b0242 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 13 Dec 2024 13:48:50 +0800 Subject: [PATCH 1/5] Add get_locator_responses --- sync/src/types/mod.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 476a5c308b..7c520d0ca2 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -1909,6 +1909,16 @@ impl ActiveChain { .collect() } + pub fn get_locator_responses( + &self, + block_number: BlockNumber, + hash_stop: &Byte32, + ) -> Vec> { + (0..32).iter().map(|index| { + get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default()) + }).collect(); + } + pub fn send_getheaders_to_peer( &self, nc: &dyn CKBProtocolContext, From df6cab7274210ab434e753764fa6cb77be2fdbfb Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 13 Dec 2024 14:35:48 +0800 Subject: [PATCH 2/5] Send multi continuous Headers --- sync/src/synchronizer/get_headers_process.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sync/src/synchronizer/get_headers_process.rs b/sync/src/synchronizer/get_headers_process.rs index 12c5041413..3445f55c07 100644 --- a/sync/src/synchronizer/get_headers_process.rs +++ b/sync/src/synchronizer/get_headers_process.rs @@ -74,18 +74,18 @@ impl<'a> GetHeadersProcess<'a> { ); self.synchronizer.peers().getheaders_received(self.peer); - let headers: Vec = - active_chain.get_locator_response(block_number, &hash_stop); + let headers_vec: Vec> = + active_chain.get_locator_responses(block_number, &hash_stop); // response headers - debug!("headers len={}", headers.len()); - - let content = packed::SendHeaders::new_builder() - .headers(headers.into_iter().map(|x| x.data()).pack()) - .build(); - let message = packed::SyncMessage::new_builder().set(content).build(); - - attempt!(send_message_to(self.nc, self.peer, &message)); + debug!("headers len={}", headers_vec.len()); + for headers in headers_vec { + let content = packed::SendHeaders::new_builder() + .headers(headers.into_iter().map(|x| x.data()).pack()) + .build(); + let message = packed::SyncMessage::new_builder().set(content).build(); + attempt!(send_message_to(self.nc, self.peer, &message)); + } } else { return StatusCode::GetHeadersMissCommonAncestors .with_context(format!("{block_locator_hashes:#x?}")); From 7accf535594b84d28184f3864bbdc50ba3c0d89e Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 13 Dec 2024 15:01:17 +0800 Subject: [PATCH 3/5] Add is_parent_exists for HeaderProcess --- sync/src/synchronizer/headers_process.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index 9fdeeb94bc..5c3d1bc1da 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -52,6 +52,12 @@ impl<'a> HeadersProcess<'a> { true } + fn is_parent_exists(&self, first_header: &core:HeaderView) -> bool { + let shared: &SyncShared = self.synchronizer.shared(); + shared.get_header_fields(first_header.parent_hash).is_some() + } + + pub fn accept_first(&self, first: &core::HeaderView) -> ValidationResult { let shared: &SyncShared = self.synchronizer.shared(); let verifier = HeaderVerifier::new(shared, shared.consensus()); @@ -128,6 +134,12 @@ impl<'a> HeadersProcess<'a> { return StatusCode::HeadersIsInvalid.with_context("not continuous"); } + if !self.is_parent_exists(&headers[0]) { + // put the headers into a memory cache + // verify them later + return Status::ok(); + } + let result = self.accept_first(&headers[0]); match result.state { ValidationState::Invalid => { From a12cd964ba5d5bf5416760f043383e53dc2e7983 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 13 Dec 2024 15:11:45 +0800 Subject: [PATCH 4/5] Put non-continous headers to header_cache, and re-check it after header_process success --- sync/src/synchronizer/headers_process.rs | 9 +++++++++ sync/src/synchronizer/mod.rs | 3 +++ 2 files changed, 12 insertions(+) diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index 5c3d1bc1da..309244da8f 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -136,6 +136,7 @@ impl<'a> HeadersProcess<'a> { if !self.is_parent_exists(&headers[0]) { // put the headers into a memory cache + self.synchronizer.header_cache.insert(headers[0].parent_hash, headers); // verify them later return Status::ok(); } @@ -222,6 +223,14 @@ impl<'a> HeadersProcess<'a> { } } + { + // these headers verify success + // may the headers's tail header_hash exist in headers_cahce? + if let Some(headers) = self.synchronizer.headers_cache.get(headers.last().expect("last header must exist").hash){ + HeadersProcess::new().execute(); + } + } + Status::ok() } } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 83aa1213c3..c716b9f1d0 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -301,6 +301,9 @@ pub struct Synchronizer { pub(crate) chain: ChainController, /// Sync shared state pub shared: Arc, + + // First Headers's parent_hash -> Headers + pub(crate) header_cache: HashMap>, fetch_channel: Option>, } From 80d8b6c095261f1e17ccc9d2524ca16949a641c5 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sat, 14 Dec 2024 13:34:30 +0800 Subject: [PATCH 5/5] wip Signed-off-by: Eval EXEC --- sync/src/synchronizer/get_headers_process.rs | 26 +++++++++++++++---- sync/src/synchronizer/headers_process.rs | 27 ++++++++++++++------ sync/src/synchronizer/mod.rs | 6 ++++- sync/src/types/mod.rs | 17 ++++++++---- 4 files changed, 57 insertions(+), 19 deletions(-) diff --git a/sync/src/synchronizer/get_headers_process.rs b/sync/src/synchronizer/get_headers_process.rs index 3445f55c07..d7565c7a5f 100644 --- a/sync/src/synchronizer/get_headers_process.rs +++ b/sync/src/synchronizer/get_headers_process.rs @@ -74,17 +74,33 @@ impl<'a> GetHeadersProcess<'a> { ); self.synchronizer.peers().getheaders_received(self.peer); - let headers_vec: Vec> = - active_chain.get_locator_responses(block_number, &hash_stop); - // response headers - debug!("headers len={}", headers_vec.len()); - for headers in headers_vec { + let hash_size: packed::Uint32 = 20_u32.pack(); + let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes()); + if hash_stop.eq(&length_20_for_test) { + let headers: Vec = + active_chain.get_locator_response(block_number, &hash_stop); + // response headers + + debug!("headers len={}", headers.len()); let content = packed::SendHeaders::new_builder() .headers(headers.into_iter().map(|x| x.data()).pack()) .build(); let message = packed::SyncMessage::new_builder().set(content).build(); attempt!(send_message_to(self.nc, self.peer, &message)); + } else { + let headers_vec: Vec> = + active_chain.get_locator_responses(block_number, &hash_stop); + // response headers + + debug!("headers vec len={}", headers_vec.len()); + for headers in headers_vec { + let content = packed::SendHeaders::new_builder() + .headers(headers.into_iter().map(|x| x.data()).pack()) + .build(); + let message = packed::SyncMessage::new_builder().set(content).build(); + attempt!(send_message_to(self.nc, self.peer, &message)); + } } } else { return StatusCode::GetHeadersMissCommonAncestors diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index 309244da8f..a5e09ddf31 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -52,12 +52,13 @@ impl<'a> HeadersProcess<'a> { true } - fn is_parent_exists(&self, first_header: &core:HeaderView) -> bool { + fn is_parent_exists(&self, first_header: &core::HeaderView) -> bool { let shared: &SyncShared = self.synchronizer.shared(); - shared.get_header_fields(first_header.parent_hash).is_some() + shared + .get_header_fields(&first_header.parent_hash()) + .is_some() } - pub fn accept_first(&self, first: &core::HeaderView) -> ValidationResult { let shared: &SyncShared = self.synchronizer.shared(); let verifier = HeaderVerifier::new(shared, shared.consensus()); @@ -98,8 +99,6 @@ impl<'a> HeadersProcess<'a> { pub fn execute(self) -> Status { debug!("HeadersProcess begins"); - let shared: &SyncShared = self.synchronizer.shared(); - let consensus = shared.consensus(); let headers = self .message .headers() @@ -107,6 +106,12 @@ impl<'a> HeadersProcess<'a> { .into_iter() .map(packed::Header::into_view) .collect::>(); + self.execute_inner(headers) + } + + fn execute_inner(self, headers: Vec) -> Status { + let shared: &SyncShared = self.synchronizer.shared(); + let consensus = shared.consensus(); if headers.len() > MAX_HEADERS_LEN { warn!("HeadersProcess is oversized"); @@ -136,7 +141,9 @@ impl<'a> HeadersProcess<'a> { if !self.is_parent_exists(&headers[0]) { // put the headers into a memory cache - self.synchronizer.header_cache.insert(headers[0].parent_hash, headers); + self.synchronizer + .header_cache + .insert(headers[0].parent_hash(), headers); // verify them later return Status::ok(); } @@ -226,8 +233,12 @@ impl<'a> HeadersProcess<'a> { { // these headers verify success // may the headers's tail header_hash exist in headers_cahce? - if let Some(headers) = self.synchronizer.headers_cache.get(headers.last().expect("last header must exist").hash){ - HeadersProcess::new().execute(); + if let Some(headers) = self + .synchronizer + .header_cache + .get(&headers.last().expect("last header must exist").hash()) + { + return self.execute_inner(headers.to_owned()); } } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index c716b9f1d0..516a1cc1ff 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -44,11 +44,13 @@ use ckb_systemtime::unix_time_as_millis; #[cfg(test)] use ckb_types::core; +use ckb_types::core::HeaderView; use ckb_types::{ core::BlockNumber, packed::{self, Byte32}, prelude::*, }; +use dashmap::DashMap; use std::{ collections::HashSet, sync::{atomic::Ordering, Arc}, @@ -303,7 +305,7 @@ pub struct Synchronizer { pub shared: Arc, // First Headers's parent_hash -> Headers - pub(crate) header_cache: HashMap>, + pub(crate) header_cache: DashMap>, fetch_channel: Option>, } @@ -312,10 +314,12 @@ impl Synchronizer { /// /// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it pub fn new(chain: ChainController, shared: Arc) -> Synchronizer { + let header_cache = DashMap::new(); Synchronizer { chain, shared, fetch_channel: None, + header_cache, } } diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 7c520d0ca2..e24a65b8ab 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -1912,11 +1912,16 @@ impl ActiveChain { pub fn get_locator_responses( &self, block_number: BlockNumber, - hash_stop: &Byte32, + _hash_stop: &Byte32, ) -> Vec> { - (0..32).iter().map(|index| { - get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default()) - }).collect(); + (0..32) + .map(|index| { + self.get_locator_response( + block_number + (index as u64 * MAX_HEADERS_LEN as u64), + &Byte32::default(), + ) + }) + .collect() } pub fn send_getheaders_to_peer( @@ -1955,9 +1960,11 @@ impl ActiveChain { block_number_and_hash.hash() ); let locator_hash = self.get_locator(block_number_and_hash); + let hash_size: packed::Uint32 = 20_u32.pack(); + let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes()); let content = packed::GetHeaders::new_builder() .block_locator_hashes(locator_hash.pack()) - .hash_stop(packed::Byte32::zero()) + .hash_stop(length_20_for_test) .build(); let message = packed::SyncMessage::new_builder().set(content).build(); let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);