From ed5f5beb7b8acb93fa92c9f442b40a83ed858ef8 Mon Sep 17 00:00:00 2001 From: pmnoxx Date: Fri, 29 Oct 2021 14:59:54 -0700 Subject: [PATCH] Replace chrono::now() with Instant::now() (#5097) * Replace chrono::now() with Instant::now() * refactor * refactor Co-authored-by: Piotr Mikulski --- chain/network/src/cache.rs | 34 ++++++++++++------------- chain/network/src/peer_manager.rs | 22 ++++++++-------- chain/network/src/routing.rs | 40 ++++++++++++++---------------- chain/network/tests/cache_edges.rs | 19 +++++--------- 4 files changed, 53 insertions(+), 62 deletions(-) diff --git a/chain/network/src/cache.rs b/chain/network/src/cache.rs index 5be4c855e27..b3dec2d7475 100644 --- a/chain/network/src/cache.rs +++ b/chain/network/src/cache.rs @@ -1,12 +1,10 @@ use std::collections::btree_map; use std::collections::{BTreeMap, BTreeSet, HashMap}; - -use chrono::Utc; +use std::time::{Duration, Instant}; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; -type Time = u64; type Size = u64; /// Cache to store route back messages. @@ -48,12 +46,12 @@ pub struct RouteBackCache { /// Maximum number of records allowed in the cache. capacity: u64, /// Maximum time allowed before removing a record from the cache. - evict_timeout: u64, + evict_timeout: Duration, /// Minimum number of records to delete from offending peer when the cache is full. remove_frequent_min_size: u64, /// Main map from message hash to time where it was created + target peer /// Size: O(capacity) - main: HashMap, + main: HashMap, /// Number of records allocated by each PeerId. /// The size is stored with negative sign, to order in PeerId in decreasing order. /// To avoid handling with negative number all sizes are added by capacity. @@ -62,7 +60,7 @@ pub struct RouteBackCache { /// List of all hashes associated with each PeerId. Hashes within each PeerId /// are sorted by the time they arrived from older to newer. /// Size: O(capacity) - record_per_target: BTreeMap>, + record_per_target: BTreeMap>, } impl RouteBackCache { @@ -117,8 +115,8 @@ impl RouteBackCache { if self.is_full() { self.remove_frequent(); - let now = Utc::now().timestamp_millis() as Time; - let remove_until = now.saturating_sub(self.evict_timeout); + let now = Instant::now(); + let remove_until = now - self.evict_timeout; let mut remove_empty = vec![]; @@ -153,7 +151,7 @@ impl RouteBackCache { } } - pub fn new(capacity: u64, evict_timeout: u64, remove_frequent_min_size: u64) -> Self { + pub fn new(capacity: u64, evict_timeout: Duration, remove_frequent_min_size: u64) -> Self { assert!(capacity > 0); Self { @@ -209,7 +207,7 @@ impl RouteBackCache { self.remove_evicted(); - let now = Utc::now().timestamp_millis() as Time; + let now = Instant::now(); self.main.insert(hash, (now, target.clone())); @@ -265,7 +263,7 @@ mod test { #[test] fn simple() { - let mut cache = RouteBackCache::new(100, 1000000000, 1); + let mut cache = RouteBackCache::new(100, Duration::from_millis(1000000000), 1); let (peer0, hash0) = create_message(0); check_consistency(&cache); @@ -281,7 +279,7 @@ mod test { /// Check record is removed after some timeout. #[test] fn evicted() { - let mut cache = RouteBackCache::new(1, 1, 1); + let mut cache = RouteBackCache::new(1, Duration::from_millis(1), 1); let (peer0, hash0) = create_message(0); cache.insert(hash0, peer0.clone()); @@ -296,7 +294,7 @@ mod test { /// Check element is removed after timeout triggered by insert at max capacity. #[test] fn insert_evicted() { - let mut cache = RouteBackCache::new(1, 1, 1); + let mut cache = RouteBackCache::new(1, Duration::from_millis(1), 1); let (peer0, hash0) = create_message(0); let (peer1, hash1) = create_message(1); @@ -313,7 +311,7 @@ mod test { /// Check element is removed after insert because cache is at max capacity. #[test] fn insert_override() { - let mut cache = RouteBackCache::new(1, 1000000000, 1); + let mut cache = RouteBackCache::new(1, Duration::from_millis(1000000000), 1); let (peer0, hash0) = create_message(0); let (peer1, hash1) = create_message(1); @@ -331,7 +329,7 @@ mod test { /// Check that old element from peer0 is removed, even while peer1 has more elements. #[test] fn prefer_evict() { - let mut cache = RouteBackCache::new(3, 1000, 1); + let mut cache = RouteBackCache::new(3, Duration::from_millis(100), 1); let (peer0, hash0) = create_message(0); let (peer1, hash1) = create_message(1); let (_, hash2) = create_message(2); @@ -354,7 +352,7 @@ mod test { /// Check that older element from peer1 is removed, since evict timeout haven't passed yet. #[test] fn prefer_full() { - let mut cache = RouteBackCache::new(3, 100000, 1); + let mut cache = RouteBackCache::new(3, Duration::from_millis(100000), 1); let (peer0, hash0) = create_message(0); let (peer1, hash1) = create_message(1); let (_, hash2) = create_message(2); @@ -377,7 +375,7 @@ mod test { /// Check that older element from peer1 is removed, since evict timeout haven't passed yet. #[test] fn remove_all_frequent() { - let mut cache = RouteBackCache::new(3, 100000, 2); + let mut cache = RouteBackCache::new(3, Duration::from_millis(100000), 2); let (peer0, hash0) = create_message(0); let (peer1, hash1) = create_message(1); let (_, hash2) = create_message(2); @@ -403,7 +401,7 @@ mod test { /// initial hashes should be present in the cache after the attack. #[test] fn poison_attack() { - let mut cache = RouteBackCache::new(17, 1000000, 1); + let mut cache = RouteBackCache::new(17, Duration::from_millis(1000000), 1); let mut ix = 0; let mut peers = vec![]; diff --git a/chain/network/src/peer_manager.rs b/chain/network/src/peer_manager.rs index 8544ba378e2..47fdbb49f86 100644 --- a/chain/network/src/peer_manager.rs +++ b/chain/network/src/peer_manager.rs @@ -62,13 +62,13 @@ use crate::types::{GetPeerId, GetPeerIdResult, SetAdvOptions}; use crate::types::{RoutingState, RoutingSyncV2, RoutingVersion2}; /// How often to request peers from active peers. -const REQUEST_PEERS_SECS: u64 = 60; +const REQUEST_PEERS_SECS: Duration = Duration::from_millis(60_000); /// How much time to wait (in milliseconds) after we send update nonce request before disconnecting. /// This number should be large to handle pair of nodes with high latency. -const WAIT_ON_TRY_UPDATE_NONCE: u64 = 6_000; +const WAIT_ON_TRY_UPDATE_NONCE: Duration = Duration::from_millis(6_000); /// If we see an edge between us and other peer, but this peer is not a current connection, wait this /// timeout and in case it didn't become an active peer, broadcast edge removal update. -const WAIT_PEER_BEFORE_REMOVE: u64 = 6_000; +const WAIT_PEER_BEFORE_REMOVE: Duration = Duration::from_millis(6_000); /// Maximum number an edge can increase between oldest known edge and new proposed edge. const EDGE_NONCE_BUMP_ALLOWED: u64 = 1_000; /// Ratio between consecutive attempts to establish connection with another peer. @@ -89,7 +89,9 @@ const BROADCAST_EDGES_INTERVAL: Duration = Duration::from_millis(50); /// Maximum amount of time spend processing edges. const BROAD_CAST_EDGES_MAX_WORK_ALLOWED: Duration = Duration::from_millis(50); /// Delay syncinc for 1 second to avoid race condition -const WAIT_FOR_SYNC_DELAY: Duration = Duration::from_secs(1); +const WAIT_FOR_SYNC_DELAY: Duration = Duration::from_millis(1_000); +/// How often should we update the routing table +const UPDATE_ROUTING_TABLE_INTERVAL: Duration = Duration::from_millis(1_000); macro_rules! unwrap_or_error(($obj: expr, $error: expr) => (match $obj { Ok(result) => result, @@ -219,7 +221,7 @@ impl PeerManagerActor { ctx: &mut Context, can_save_edges: bool, force_pruning: bool, - timeout: u64, + timeout: Duration, ) { let edges_to_remove = self.routing_table.update(can_save_edges, force_pruning, timeout); self.routing_table_pool @@ -751,7 +753,7 @@ impl PeerManagerActor { let mut requests = futures::stream::FuturesUnordered::new(); let msg = SendMessage { message: PeerMessage::PeersRequest }; for (_, active_peer) in self.active_peers.iter_mut() { - if active_peer.last_time_peer_requested.elapsed().as_secs() > REQUEST_PEERS_SECS { + if active_peer.last_time_peer_requested.elapsed() > REQUEST_PEERS_SECS { active_peer.last_time_peer_requested = Instant::now(); requests.push(active_peer.addr.send(msg.clone())); } @@ -782,7 +784,7 @@ impl PeerManagerActor { self.scheduled_routing_table_update = true; near_performance_metrics::actix::run_later( ctx, - Duration::from_millis(1000), + UPDATE_ROUTING_TABLE_INTERVAL, |act, ctx2| { act.scheduled_routing_table_update = false; // We only want to save prune edges if there are no pending requests to EdgeVerifier @@ -834,7 +836,7 @@ impl PeerManagerActor { near_performance_metrics::actix::run_later( ctx, - Duration::from_millis(WAIT_PEER_BEFORE_REMOVE), + WAIT_PEER_BEFORE_REMOVE, move |act, ctx| { let other = edge.other(&act.peer_id).unwrap(); if !act.active_peers.contains_key(&other) { @@ -876,7 +878,7 @@ impl PeerManagerActor { near_performance_metrics::actix::run_later( ctx, - Duration::from_millis(WAIT_ON_TRY_UPDATE_NONCE), + WAIT_ON_TRY_UPDATE_NONCE, move |act, _ctx| { if let Some(cur_nonce) = act.pending_update_nonce_request.get(&other) { if *cur_nonce == nonce { @@ -1905,7 +1907,7 @@ impl Handler for PeerManagerActor { } if let Some(true) = msg.prune_edges { debug!(target: "network", "test_features prune_edges"); - self.update_and_remove_edges(ctx, true, true, 2); + self.update_and_remove_edges(ctx, true, true, Duration::from_secs(2)); } } } diff --git a/chain/network/src/routing.rs b/chain/network/src/routing.rs index 17cadf6c712..5458b853540 100644 --- a/chain/network/src/routing.rs +++ b/chain/network/src/routing.rs @@ -1,10 +1,8 @@ use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; -use std::ops::Sub; use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::time::{Duration, Instant}; use cached::{Cached, SizedCache}; -use chrono; use conqueue::{QueueReceiver, QueueSender}; #[cfg(feature = "test_features")] use serde::{Deserialize, Serialize}; @@ -37,7 +35,7 @@ use std::hash::{Hash, Hasher}; const ANNOUNCE_ACCOUNT_CACHE_SIZE: usize = 10_000; const ROUTE_BACK_CACHE_SIZE: u64 = 100_000; -const ROUTE_BACK_CACHE_EVICT_TIMEOUT: u64 = 120_000; // 120 seconds +const ROUTE_BACK_CACHE_EVICT_TIMEOUT: Duration = Duration::from_millis(120_000); const ROUTE_BACK_CACHE_REMOVE_BATCH: u64 = 100; const PING_PONG_CACHE_SIZE: usize = 1_000; const ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED: usize = 10; @@ -45,8 +43,8 @@ const ROUND_ROBIN_NONCE_CACHE_SIZE: usize = 10_000; /// Routing table will clean edges if there is at least one node that is not reachable /// since `SAVE_PEERS_MAX_TIME` seconds. All peers disconnected since `SAVE_PEERS_AFTER_TIME` /// seconds will be removed from cache and persisted in disk. -pub const SAVE_PEERS_MAX_TIME: u64 = 7_200; -pub const SAVE_PEERS_AFTER_TIME: u64 = 3_600; +pub const SAVE_PEERS_MAX_TIME: Duration = Duration::from_millis(7_200); +pub const SAVE_PEERS_AFTER_TIME: Duration = Duration::from_millis(3_600); /// Graph implementation supports up to 128 peers. pub const MAX_NUM_PEERS: usize = 128; @@ -392,7 +390,7 @@ pub struct RoutingTable { /// Hash of messages that requires routing back to respective previous hop. pub route_back: RouteBackCache, /// Last time a peer with reachable through active edges. - pub peer_last_time_reachable: HashMap>, + pub peer_last_time_reachable: HashMap, /// Access to store on disk store: Arc, /// Current view of the network. Nodes are Peers and edges are active connections. @@ -581,11 +579,8 @@ impl RoutingTable { if let Ok(cur_nonce) = self.component_nonce_from_peer(peer_id.clone()) { if cur_nonce == nonce { - self.peer_last_time_reachable.insert( - peer_id.clone(), - chrono::Utc::now() - .sub(chrono::Duration::seconds(SAVE_PEERS_MAX_TIME as i64)), - ); + self.peer_last_time_reachable + .insert(peer_id.clone(), Instant::now() - SAVE_PEERS_MAX_TIME); update .delete(ColPeerComponent, Vec::from(peer_id.clone()).as_ref()); } @@ -599,7 +594,7 @@ impl RoutingTable { warn!(target: "network", "Error removing network component from store. {:?}", e); } } else { - self.peer_last_time_reachable.insert(peer_id.clone(), chrono::Utc::now()); + self.peer_last_time_reachable.insert(peer_id.clone(), Instant::now()); } } @@ -751,15 +746,15 @@ impl RoutingTable { RoutingTableInfo { account_peers, peer_forwarding: self.peer_forwarding.clone() } } - fn try_save_edges(&mut self, force_pruning: bool, timeout: u64) -> Vec { - let now = chrono::Utc::now(); + fn try_save_edges(&mut self, force_pruning: bool, timeout: Duration) -> Vec { + let now = Instant::now(); let mut oldest_time = now; let to_save = self .peer_last_time_reachable .iter() .filter_map(|(peer_id, last_time)| { oldest_time = std::cmp::min(oldest_time, *last_time); - if now.signed_duration_since(*last_time).num_seconds() >= timeout as i64 { + if now.duration_since(*last_time) >= timeout { Some(peer_id.clone()) } else { None @@ -769,9 +764,7 @@ impl RoutingTable { // Save nodes on disk and remove from memory only if elapsed time from oldest peer // is greater than `SAVE_PEERS_MAX_TIME` - if !force_pruning - && now.signed_duration_since(oldest_time).num_seconds() < SAVE_PEERS_MAX_TIME as i64 - { + if !force_pruning && now.duration_since(oldest_time) < SAVE_PEERS_MAX_TIME { return Vec::new(); } debug!(target: "network", "try_save_edges: We are going to remove {} peers", to_save.len()); @@ -813,7 +806,12 @@ impl RoutingTable { } /// Recalculate routing table. - pub fn update(&mut self, can_save_edges: bool, force_pruning: bool, timeout: u64) -> Vec { + pub fn update( + &mut self, + can_save_edges: bool, + force_pruning: bool, + timeout: Duration, + ) -> Vec { #[cfg(feature = "delay_detector")] let _d = DelayDetector::new("routing table update".into()); let _routing_table_recalculation = @@ -823,7 +821,7 @@ impl RoutingTable { self.peer_forwarding = self.raw_graph.calculate_distance(); - let now = chrono::Utc::now(); + let now = Instant::now(); for peer in self.peer_forwarding.keys() { self.peer_last_time_reachable.insert(peer.clone(), now); } diff --git a/chain/network/tests/cache_edges.rs b/chain/network/tests/cache_edges.rs index b01b27275d1..a11de8dff6c 100644 --- a/chain/network/tests/cache_edges.rs +++ b/chain/network/tests/cache_edges.rs @@ -1,8 +1,8 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Instant; use borsh::de::BorshDeserialize; -use chrono::{DateTime, Duration, Utc}; use near_crypto::Signature; use near_network::routing::{ @@ -28,14 +28,14 @@ struct RoutingTableTest { store: Arc, peers: Vec, rev_peers: HashMap, - times: Vec>, + times: Vec, } impl RoutingTableTest { fn new() -> Self { let me = random_peer_id(); let store = create_test_store(); - let now = Utc::now(); + let now = Instant::now(); Self { routing_table: RoutingTable::new(me.clone(), store.clone()), @@ -43,16 +43,9 @@ impl RoutingTableTest { peers: vec![me.clone()], rev_peers: vec![(me, 0)].into_iter().collect(), times: vec![ - now.checked_sub_signed(Duration::seconds((SAVE_PEERS_AFTER_TIME / 2) as i64)) - .unwrap(), - now.checked_sub_signed(Duration::seconds( - ((SAVE_PEERS_AFTER_TIME + SAVE_PEERS_MAX_TIME) / 2) as i64, - )) - .unwrap(), - now.checked_sub_signed(Duration::seconds( - (SAVE_PEERS_MAX_TIME * 3 / 2 - SAVE_PEERS_AFTER_TIME / 2) as i64, - )) - .unwrap(), + now - (SAVE_PEERS_AFTER_TIME / 2), + now - (SAVE_PEERS_AFTER_TIME + SAVE_PEERS_MAX_TIME) / 2, + now - (SAVE_PEERS_MAX_TIME * 3 / 2 - SAVE_PEERS_AFTER_TIME / 2), ], } }