Skip to content

Commit

Permalink
Replace chrono::now() with Instant::now() (#5097)
Browse files Browse the repository at this point in the history
* Replace chrono::now() with Instant::now()

* refactor

* refactor

Co-authored-by: Piotr Mikulski <[email protected]>
  • Loading branch information
pmnoxx and pmnoxx authored Oct 29, 2021
1 parent 805bf37 commit 1cb995f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 62 deletions.
34 changes: 16 additions & 18 deletions chain/network/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::collections::btree_map;
use std::collections::{BTreeMap, BTreeSet, HashMap};

use chrono::Utc;
use std::time::{Duration, Instant};

use near_primitives::hash::CryptoHash;
use near_primitives::network::PeerId;

type Time = u64;
type Size = u64;

/// Cache to store route back messages.
Expand Down Expand Up @@ -48,12 +46,12 @@ pub struct RouteBackCache {
/// Maximum number of records allowed in the cache.
capacity: u64,
/// Maximum time allowed before removing a record from the cache.
evict_timeout: u64,
evict_timeout: Duration,
/// Minimum number of records to delete from offending peer when the cache is full.
remove_frequent_min_size: u64,
/// Main map from message hash to time where it was created + target peer
/// Size: O(capacity)
main: HashMap<CryptoHash, (Time, PeerId)>,
main: HashMap<CryptoHash, (Instant, PeerId)>,
/// Number of records allocated by each PeerId.
/// The size is stored with negative sign, to order in PeerId in decreasing order.
/// To avoid handling with negative number all sizes are added by capacity.
Expand All @@ -62,7 +60,7 @@ pub struct RouteBackCache {
/// List of all hashes associated with each PeerId. Hashes within each PeerId
/// are sorted by the time they arrived from older to newer.
/// Size: O(capacity)
record_per_target: BTreeMap<PeerId, BTreeSet<(Time, CryptoHash)>>,
record_per_target: BTreeMap<PeerId, BTreeSet<(Instant, CryptoHash)>>,
}

impl RouteBackCache {
Expand Down Expand Up @@ -117,8 +115,8 @@ impl RouteBackCache {
if self.is_full() {
self.remove_frequent();

let now = Utc::now().timestamp_millis() as Time;
let remove_until = now.saturating_sub(self.evict_timeout);
let now = Instant::now();
let remove_until = now - self.evict_timeout;

let mut remove_empty = vec![];

Expand Down Expand Up @@ -153,7 +151,7 @@ impl RouteBackCache {
}
}

pub fn new(capacity: u64, evict_timeout: u64, remove_frequent_min_size: u64) -> Self {
pub fn new(capacity: u64, evict_timeout: Duration, remove_frequent_min_size: u64) -> Self {
assert!(capacity > 0);

Self {
Expand Down Expand Up @@ -209,7 +207,7 @@ impl RouteBackCache {

self.remove_evicted();

let now = Utc::now().timestamp_millis() as Time;
let now = Instant::now();

self.main.insert(hash, (now, target.clone()));

Expand Down Expand Up @@ -265,7 +263,7 @@ mod test {

#[test]
fn simple() {
let mut cache = RouteBackCache::new(100, 1000000000, 1);
let mut cache = RouteBackCache::new(100, Duration::from_millis(1000000000), 1);
let (peer0, hash0) = create_message(0);

check_consistency(&cache);
Expand All @@ -281,7 +279,7 @@ mod test {
/// Check record is removed after some timeout.
#[test]
fn evicted() {
let mut cache = RouteBackCache::new(1, 1, 1);
let mut cache = RouteBackCache::new(1, Duration::from_millis(1), 1);
let (peer0, hash0) = create_message(0);

cache.insert(hash0, peer0.clone());
Expand All @@ -296,7 +294,7 @@ mod test {
/// Check element is removed after timeout triggered by insert at max capacity.
#[test]
fn insert_evicted() {
let mut cache = RouteBackCache::new(1, 1, 1);
let mut cache = RouteBackCache::new(1, Duration::from_millis(1), 1);
let (peer0, hash0) = create_message(0);
let (peer1, hash1) = create_message(1);

Expand All @@ -313,7 +311,7 @@ mod test {
/// Check element is removed after insert because cache is at max capacity.
#[test]
fn insert_override() {
let mut cache = RouteBackCache::new(1, 1000000000, 1);
let mut cache = RouteBackCache::new(1, Duration::from_millis(1000000000), 1);
let (peer0, hash0) = create_message(0);
let (peer1, hash1) = create_message(1);

Expand All @@ -331,7 +329,7 @@ mod test {
/// Check that old element from peer0 is removed, even while peer1 has more elements.
#[test]
fn prefer_evict() {
let mut cache = RouteBackCache::new(3, 1000, 1);
let mut cache = RouteBackCache::new(3, Duration::from_millis(100), 1);
let (peer0, hash0) = create_message(0);
let (peer1, hash1) = create_message(1);
let (_, hash2) = create_message(2);
Expand All @@ -354,7 +352,7 @@ mod test {
/// Check that older element from peer1 is removed, since evict timeout haven't passed yet.
#[test]
fn prefer_full() {
let mut cache = RouteBackCache::new(3, 100000, 1);
let mut cache = RouteBackCache::new(3, Duration::from_millis(100000), 1);
let (peer0, hash0) = create_message(0);
let (peer1, hash1) = create_message(1);
let (_, hash2) = create_message(2);
Expand All @@ -377,7 +375,7 @@ mod test {
/// Check that older element from peer1 is removed, since evict timeout haven't passed yet.
#[test]
fn remove_all_frequent() {
let mut cache = RouteBackCache::new(3, 100000, 2);
let mut cache = RouteBackCache::new(3, Duration::from_millis(100000), 2);
let (peer0, hash0) = create_message(0);
let (peer1, hash1) = create_message(1);
let (_, hash2) = create_message(2);
Expand All @@ -403,7 +401,7 @@ mod test {
/// initial hashes should be present in the cache after the attack.
#[test]
fn poison_attack() {
let mut cache = RouteBackCache::new(17, 1000000, 1);
let mut cache = RouteBackCache::new(17, Duration::from_millis(1000000), 1);
let mut ix = 0;

let mut peers = vec![];
Expand Down
22 changes: 12 additions & 10 deletions chain/network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ use crate::types::{GetPeerId, GetPeerIdResult, SetAdvOptions};
use crate::types::{RoutingState, RoutingSyncV2, RoutingVersion2};

/// How often to request peers from active peers.
const REQUEST_PEERS_SECS: u64 = 60;
const REQUEST_PEERS_SECS: Duration = Duration::from_millis(60_000);
/// How much time to wait (in milliseconds) after we send update nonce request before disconnecting.
/// This number should be large to handle pair of nodes with high latency.
const WAIT_ON_TRY_UPDATE_NONCE: u64 = 6_000;
const WAIT_ON_TRY_UPDATE_NONCE: Duration = Duration::from_millis(6_000);
/// If we see an edge between us and other peer, but this peer is not a current connection, wait this
/// timeout and in case it didn't become an active peer, broadcast edge removal update.
const WAIT_PEER_BEFORE_REMOVE: u64 = 6_000;
const WAIT_PEER_BEFORE_REMOVE: Duration = Duration::from_millis(6_000);
/// Maximum number an edge can increase between oldest known edge and new proposed edge.
const EDGE_NONCE_BUMP_ALLOWED: u64 = 1_000;
/// Ratio between consecutive attempts to establish connection with another peer.
Expand All @@ -89,7 +89,9 @@ const BROADCAST_EDGES_INTERVAL: Duration = Duration::from_millis(50);
/// Maximum amount of time spend processing edges.
const BROAD_CAST_EDGES_MAX_WORK_ALLOWED: Duration = Duration::from_millis(50);
/// Delay syncinc for 1 second to avoid race condition
const WAIT_FOR_SYNC_DELAY: Duration = Duration::from_secs(1);
const WAIT_FOR_SYNC_DELAY: Duration = Duration::from_millis(1_000);
/// How often should we update the routing table
const UPDATE_ROUTING_TABLE_INTERVAL: Duration = Duration::from_millis(1_000);

macro_rules! unwrap_or_error(($obj: expr, $error: expr) => (match $obj {
Ok(result) => result,
Expand Down Expand Up @@ -219,7 +221,7 @@ impl PeerManagerActor {
ctx: &mut Context<PeerManagerActor>,
can_save_edges: bool,
force_pruning: bool,
timeout: u64,
timeout: Duration,
) {
let edges_to_remove = self.routing_table.update(can_save_edges, force_pruning, timeout);
self.routing_table_pool
Expand Down Expand Up @@ -751,7 +753,7 @@ impl PeerManagerActor {
let mut requests = futures::stream::FuturesUnordered::new();
let msg = SendMessage { message: PeerMessage::PeersRequest };
for (_, active_peer) in self.active_peers.iter_mut() {
if active_peer.last_time_peer_requested.elapsed().as_secs() > REQUEST_PEERS_SECS {
if active_peer.last_time_peer_requested.elapsed() > REQUEST_PEERS_SECS {
active_peer.last_time_peer_requested = Instant::now();
requests.push(active_peer.addr.send(msg.clone()));
}
Expand Down Expand Up @@ -782,7 +784,7 @@ impl PeerManagerActor {
self.scheduled_routing_table_update = true;
near_performance_metrics::actix::run_later(
ctx,
Duration::from_millis(1000),
UPDATE_ROUTING_TABLE_INTERVAL,
|act, ctx2| {
act.scheduled_routing_table_update = false;
// We only want to save prune edges if there are no pending requests to EdgeVerifier
Expand Down Expand Up @@ -834,7 +836,7 @@ impl PeerManagerActor {

near_performance_metrics::actix::run_later(
ctx,
Duration::from_millis(WAIT_PEER_BEFORE_REMOVE),
WAIT_PEER_BEFORE_REMOVE,
move |act, ctx| {
let other = edge.other(&act.peer_id).unwrap();
if !act.active_peers.contains_key(&other) {
Expand Down Expand Up @@ -876,7 +878,7 @@ impl PeerManagerActor {

near_performance_metrics::actix::run_later(
ctx,
Duration::from_millis(WAIT_ON_TRY_UPDATE_NONCE),
WAIT_ON_TRY_UPDATE_NONCE,
move |act, _ctx| {
if let Some(cur_nonce) = act.pending_update_nonce_request.get(&other) {
if *cur_nonce == nonce {
Expand Down Expand Up @@ -1905,7 +1907,7 @@ impl Handler<crate::types::SetRoutingTable> for PeerManagerActor {
}
if let Some(true) = msg.prune_edges {
debug!(target: "network", "test_features prune_edges");
self.update_and_remove_edges(ctx, true, true, 2);
self.update_and_remove_edges(ctx, true, true, Duration::from_secs(2));
}
}
}
Expand Down
40 changes: 19 additions & 21 deletions chain/network/src/routing.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use std::ops::Sub;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use std::time::{Duration, Instant};

use cached::{Cached, SizedCache};
use chrono;
use conqueue::{QueueReceiver, QueueSender};
#[cfg(feature = "test_features")]
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -37,16 +35,16 @@ use std::hash::{Hash, Hasher};

const ANNOUNCE_ACCOUNT_CACHE_SIZE: usize = 10_000;
const ROUTE_BACK_CACHE_SIZE: u64 = 100_000;
const ROUTE_BACK_CACHE_EVICT_TIMEOUT: u64 = 120_000; // 120 seconds
const ROUTE_BACK_CACHE_EVICT_TIMEOUT: Duration = Duration::from_millis(120_000);
const ROUTE_BACK_CACHE_REMOVE_BATCH: u64 = 100;
const PING_PONG_CACHE_SIZE: usize = 1_000;
const ROUND_ROBIN_MAX_NONCE_DIFFERENCE_ALLOWED: usize = 10;
const ROUND_ROBIN_NONCE_CACHE_SIZE: usize = 10_000;
/// Routing table will clean edges if there is at least one node that is not reachable
/// since `SAVE_PEERS_MAX_TIME` seconds. All peers disconnected since `SAVE_PEERS_AFTER_TIME`
/// seconds will be removed from cache and persisted in disk.
pub const SAVE_PEERS_MAX_TIME: u64 = 7_200;
pub const SAVE_PEERS_AFTER_TIME: u64 = 3_600;
pub const SAVE_PEERS_MAX_TIME: Duration = Duration::from_millis(7_200);
pub const SAVE_PEERS_AFTER_TIME: Duration = Duration::from_millis(3_600);
/// Graph implementation supports up to 128 peers.
pub const MAX_NUM_PEERS: usize = 128;

Expand Down Expand Up @@ -392,7 +390,7 @@ pub struct RoutingTable {
/// Hash of messages that requires routing back to respective previous hop.
pub route_back: RouteBackCache,
/// Last time a peer with reachable through active edges.
pub peer_last_time_reachable: HashMap<PeerId, chrono::DateTime<chrono::Utc>>,
pub peer_last_time_reachable: HashMap<PeerId, Instant>,
/// Access to store on disk
store: Arc<Store>,
/// Current view of the network. Nodes are Peers and edges are active connections.
Expand Down Expand Up @@ -581,11 +579,8 @@ impl RoutingTable {

if let Ok(cur_nonce) = self.component_nonce_from_peer(peer_id.clone()) {
if cur_nonce == nonce {
self.peer_last_time_reachable.insert(
peer_id.clone(),
chrono::Utc::now()
.sub(chrono::Duration::seconds(SAVE_PEERS_MAX_TIME as i64)),
);
self.peer_last_time_reachable
.insert(peer_id.clone(), Instant::now() - SAVE_PEERS_MAX_TIME);
update
.delete(ColPeerComponent, Vec::from(peer_id.clone()).as_ref());
}
Expand All @@ -599,7 +594,7 @@ impl RoutingTable {
warn!(target: "network", "Error removing network component from store. {:?}", e);
}
} else {
self.peer_last_time_reachable.insert(peer_id.clone(), chrono::Utc::now());
self.peer_last_time_reachable.insert(peer_id.clone(), Instant::now());
}
}

Expand Down Expand Up @@ -751,15 +746,15 @@ impl RoutingTable {
RoutingTableInfo { account_peers, peer_forwarding: self.peer_forwarding.clone() }
}

fn try_save_edges(&mut self, force_pruning: bool, timeout: u64) -> Vec<Edge> {
let now = chrono::Utc::now();
fn try_save_edges(&mut self, force_pruning: bool, timeout: Duration) -> Vec<Edge> {
let now = Instant::now();
let mut oldest_time = now;
let to_save = self
.peer_last_time_reachable
.iter()
.filter_map(|(peer_id, last_time)| {
oldest_time = std::cmp::min(oldest_time, *last_time);
if now.signed_duration_since(*last_time).num_seconds() >= timeout as i64 {
if now.duration_since(*last_time) >= timeout {
Some(peer_id.clone())
} else {
None
Expand All @@ -769,9 +764,7 @@ impl RoutingTable {

// Save nodes on disk and remove from memory only if elapsed time from oldest peer
// is greater than `SAVE_PEERS_MAX_TIME`
if !force_pruning
&& now.signed_duration_since(oldest_time).num_seconds() < SAVE_PEERS_MAX_TIME as i64
{
if !force_pruning && now.duration_since(oldest_time) < SAVE_PEERS_MAX_TIME {
return Vec::new();
}
debug!(target: "network", "try_save_edges: We are going to remove {} peers", to_save.len());
Expand Down Expand Up @@ -813,7 +806,12 @@ impl RoutingTable {
}

/// Recalculate routing table.
pub fn update(&mut self, can_save_edges: bool, force_pruning: bool, timeout: u64) -> Vec<Edge> {
pub fn update(
&mut self,
can_save_edges: bool,
force_pruning: bool,
timeout: Duration,
) -> Vec<Edge> {
#[cfg(feature = "delay_detector")]
let _d = DelayDetector::new("routing table update".into());
let _routing_table_recalculation =
Expand All @@ -823,7 +821,7 @@ impl RoutingTable {

self.peer_forwarding = self.raw_graph.calculate_distance();

let now = chrono::Utc::now();
let now = Instant::now();
for peer in self.peer_forwarding.keys() {
self.peer_last_time_reachable.insert(peer.clone(), now);
}
Expand Down
19 changes: 6 additions & 13 deletions chain/network/tests/cache_edges.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

use borsh::de::BorshDeserialize;
use chrono::{DateTime, Duration, Utc};

use near_crypto::Signature;
use near_network::routing::{
Expand All @@ -28,31 +28,24 @@ struct RoutingTableTest {
store: Arc<Store>,
peers: Vec<PeerId>,
rev_peers: HashMap<PeerId, usize>,
times: Vec<DateTime<Utc>>,
times: Vec<Instant>,
}

impl RoutingTableTest {
fn new() -> Self {
let me = random_peer_id();
let store = create_test_store();
let now = Utc::now();
let now = Instant::now();

Self {
routing_table: RoutingTable::new(me.clone(), store.clone()),
store,
peers: vec![me.clone()],
rev_peers: vec![(me, 0)].into_iter().collect(),
times: vec![
now.checked_sub_signed(Duration::seconds((SAVE_PEERS_AFTER_TIME / 2) as i64))
.unwrap(),
now.checked_sub_signed(Duration::seconds(
((SAVE_PEERS_AFTER_TIME + SAVE_PEERS_MAX_TIME) / 2) as i64,
))
.unwrap(),
now.checked_sub_signed(Duration::seconds(
(SAVE_PEERS_MAX_TIME * 3 / 2 - SAVE_PEERS_AFTER_TIME / 2) as i64,
))
.unwrap(),
now - (SAVE_PEERS_AFTER_TIME / 2),
now - (SAVE_PEERS_AFTER_TIME + SAVE_PEERS_MAX_TIME) / 2,
now - (SAVE_PEERS_MAX_TIME * 3 / 2 - SAVE_PEERS_AFTER_TIME / 2),
],
}
}
Expand Down

0 comments on commit 1cb995f

Please sign in to comment.