Skip to content

Commit

Permalink
feat: ttl filter
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Feb 16, 2023
1 parent b11f78c commit 14a41ae
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 15 deletions.
3 changes: 2 additions & 1 deletion sync/src/relayer/transaction_hashes_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ impl<'a> TransactionHashesProcess<'a> {
}

let tx_hashes: Vec<_> = {
let tx_filter = state.tx_filter();
let mut tx_filter = state.tx_filter();
tx_filter.remove_expired();
self.message
.tx_hashes()
.iter()
Expand Down
3 changes: 2 additions & 1 deletion sync/src/relayer/transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ impl<'a> TransactionsProcess<'a> {
let shared_state = self.relayer.shared().state();
let txs: Vec<(TransactionView, Cycle)> = {
// ignore the tx if it's already known or it has never been requested before
let tx_filter = shared_state.tx_filter();
let mut tx_filter = shared_state.tx_filter();
tx_filter.remove_expired();
let unknown_tx_hashes = shared_state.unknown_tx_hashes();

self.message
Expand Down
16 changes: 15 additions & 1 deletion sync/src/tests/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ckb_types::{
use rand::{thread_rng, Rng};
use std::collections::{BTreeMap, HashMap};

use crate::types::HeaderView;
use crate::types::{HeaderView, TtlFilter, FILTER_TTL};

const SKIPLIST_LENGTH: u64 = 10_000;

Expand Down Expand Up @@ -89,3 +89,17 @@ fn test_get_ancestor_use_skip_list() {
assert_eq!(found_0_header.hash(), view_0.hash());
}
}

#[test]
fn ttl_filter() {
let mut filter = TtlFilter::default();
let mut _faketime_guard = ckb_systemtime::faketime();
_faketime_guard.set_faketime(0);
filter.insert(1);
let mut _faketime_guard = ckb_systemtime::faketime();
_faketime_guard.set_faketime(FILTER_TTL * 1000 + 1000);
filter.insert(2);
filter.remove_expired();
assert!(!filter.contains(&1));
assert!(filter.contains(&2));
}
48 changes: 36 additions & 12 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ use crate::utils::send_message;
use ckb_types::core::EpochNumber;
pub use header_map::HeaderMap;

const FILTER_SIZE: usize = 20000;
const GET_HEADERS_CACHE_SIZE: usize = 10000;
// TODO: Need discussed
const GET_HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
const TX_FILTER_SIZE: usize = 50000;
const FILTER_SIZE: usize = 50000;
const ORPHAN_BLOCK_SIZE: usize = 1024;
// 2 ** 13 < 6 * 1800 < 2 ** 14
const ONE_DAY_BLOCK_NUMBER: u64 = 8192;
const SHRINK_THRESHOLD: usize = 300;
pub(crate) const FILTER_TTL: u64 = 4 * 60 * 60;

// State used to enforce CHAIN_SYNC_TIMEOUT
// Only in effect for connections that are outbound, non-manual,
Expand Down Expand Up @@ -329,20 +329,22 @@ impl PeerState {
}
}

pub struct Filter<T: Eq + Hash> {
inner: LruCache<T, ()>,
pub struct TtlFilter<T> {
inner: LruCache<T, u64>,
ttl: u64,
}

impl<T: Eq + Hash> Default for Filter<T> {
impl<T: Eq + Hash + Clone> Default for TtlFilter<T> {
fn default() -> Self {
Filter::new(FILTER_SIZE)
TtlFilter::new(FILTER_SIZE, FILTER_TTL)
}
}

impl<T: Eq + Hash> Filter<T> {
pub fn new(size: usize) -> Self {
impl<T: Eq + Hash + Clone> TtlFilter<T> {
pub fn new(size: usize, ttl: u64) -> Self {
Self {
inner: LruCache::new(size),
ttl,
}
}

Expand All @@ -351,12 +353,34 @@ impl<T: Eq + Hash> Filter<T> {
}

pub fn insert(&mut self, item: T) -> bool {
self.inner.put(item, ()).is_none()
let now = ckb_systemtime::unix_time().as_secs();
self.inner.put(item, now).is_none()
}

pub fn remove(&mut self, item: &T) -> bool {
self.inner.pop(item).is_some()
}

/// Removes expired items.
pub fn remove_expired(&mut self) {
let now = ckb_systemtime::unix_time().as_secs();
let expired_keys: Vec<T> = self
.inner
.iter()
.filter_map(|(key, time)| {
if *time + self.ttl < now {
Some(key)
} else {
None
}
})
.cloned()
.collect();

for k in expired_keys {
self.remove(&k);
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -1201,7 +1225,7 @@ impl SyncShared {
shared_best_header,
header_map,
block_status_map: DashMap::new(),
tx_filter: Mutex::new(Filter::new(TX_FILTER_SIZE)),
tx_filter: Mutex::new(TtlFilter::default()),
unknown_tx_hashes: Mutex::new(KeyedPriorityQueue::new()),
peers: Peers::default(),
pending_get_block_proposals: DashMap::new(),
Expand Down Expand Up @@ -1528,7 +1552,7 @@ pub struct SyncState {
shared_best_header: RwLock<HeaderView>,
header_map: HeaderMap,
block_status_map: DashMap<Byte32, BlockStatus>,
tx_filter: Mutex<Filter<Byte32>>,
tx_filter: Mutex<TtlFilter<Byte32>>,

// The priority is ordering by timestamp (reversed), means do not ask the tx before this timestamp (timeout).
unknown_tx_hashes: Mutex<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>>,
Expand Down Expand Up @@ -1754,7 +1778,7 @@ impl SyncState {
self.tx_filter.lock().contains(hash)
}

pub fn tx_filter(&self) -> MutexGuard<Filter<Byte32>> {
pub fn tx_filter(&self) -> MutexGuard<TtlFilter<Byte32>> {
self.tx_filter.lock()
}

Expand Down

0 comments on commit 14a41ae

Please sign in to comment.