diff --git a/Cargo.lock b/Cargo.lock index 808f992747..385cff6819 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -265,6 +265,7 @@ dependencies = [ "base64 0.11.0", "byteorder 1.4.3", "bytes 0.5.6", + "common", "env_logger", "fnv", "futures 0.3.15", @@ -904,6 +905,7 @@ dependencies = [ "crossterm", "derive_more", "findshlibs", + "fnv", "fomat-macros 0.2.1", "fomat-macros 0.3.1", "futures 0.1.29", @@ -952,6 +954,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", + "wasm-timer", "web-sys", "winapi", ] diff --git a/docs/HEAPTRACK.md b/docs/HEAPTRACK.md new file mode 100644 index 0000000000..b25d8b901f --- /dev/null +++ b/docs/HEAPTRACK.md @@ -0,0 +1,21 @@ +# Memory profiling MM2 with heaptrack +1. Install dependencies required by heaptrack if they are not already installed on the system +* extra-cmake-modules +* Qt 5.2 or higher: Core, Widgets +* KDE Frameworks 5: CoreAddons, I18n, ItemModels, ThreadWeaver, ConfigWidgets, KIO, IconThemes + +2. Install heaptrack on Ubuntu (18.04) or higher: +``` +sudo apt install heaptrack heaptrack-gui +``` + +3. Use heaptrack to run MM2 binary and pass parameters as usual. An example for this would be: +``` +heaptrack ./mm2 "{\"gui\":\"MM2GUI\",\"netid\":7777, \"userhome\":\"/${HOME#"/"}\", \"passphrase\":\"YOUR_PASSPHRASE_HERE\", \"rpc_password\":\"YOUR_PASSWORD_HERE\",\"i_am_seed\":true}" & +``` +Running heaptrack like this writes a gzipped result file in the same folder the above command ran from. We can now take a look at using the next step. + +4. After running MM2 for sometime we can visualize the memory profiling results using the below command. Note that ```heaptrack.mm2.xxxx.gz``` is the name of the file generated through the above command with numbers instead of xxxx +``` +heaptrack_gui heaptrack.mm2.xxxx.gz +``` \ No newline at end of file diff --git a/mm2src/common/Cargo.toml b/mm2src/common/Cargo.toml index 16e972e7ef..d7fe88d098 100644 --- a/mm2src/common/Cargo.toml +++ b/mm2src/common/Cargo.toml @@ -22,6 +22,7 @@ cfg-if = "1.0" crossbeam = "0.7" derive_more = "0.99" findshlibs = "0.5" +fnv = "1.0.6" fomat-macros = "0.2" futures01 = { version = "0.1", package = "futures" } futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] } @@ -50,6 +51,7 @@ serde_repr = "0.1.6" ser_error = { path = "../derives/ser_error" } ser_error_derive = { path = "../derives/ser_error_derive" } uuid = { version = "0.7", features = ["serde", "v4"] } +wasm-timer = "0.2.4" winapi = "0.3" [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/mm2src/common/common.rs b/mm2src/common/common.rs index a504069517..fe95891dad 100644 --- a/mm2src/common/common.rs +++ b/mm2src/common/common.rs @@ -100,6 +100,7 @@ pub mod mm_number; pub mod privkey; pub mod seri; #[path = "patterns/state_machine.rs"] pub mod state_machine; +pub mod time_cache; #[cfg(target_arch = "wasm32")] pub mod wasm_indexed_db; #[cfg(target_arch = "wasm32")] pub mod wasm_rpc; diff --git a/mm2src/gossipsub/src/time_cache.rs b/mm2src/common/time_cache.rs similarity index 79% rename from mm2src/gossipsub/src/time_cache.rs rename to mm2src/common/time_cache.rs index 8f00ab84a5..6667a20137 100644 --- a/mm2src/gossipsub/src/time_cache.rs +++ b/mm2src/common/time_cache.rs @@ -22,18 +22,27 @@ use fnv::FnvHashMap; use std::collections::hash_map::{self, - Entry::{Occupied, Vacant}}; + Entry::{Occupied, Vacant}, + Iter, Keys}; use std::collections::VecDeque; use std::time::Duration; use wasm_timer::Instant; -struct ExpiringElement { +#[derive(Debug)] +pub struct ExpiringElement { /// The element that expires element: Element, /// The expire time. expires: Instant, } +impl ExpiringElement { + pub fn get_element(&self) -> &Element { &self.element } + + pub fn update_expiration(&mut self, expires: Instant) { self.expires = expires } +} + +#[derive(Debug)] pub struct TimeCache { /// Mapping a key to its value together with its latest expire time (can be updated through /// reinserts). @@ -77,6 +86,17 @@ where }) .element } + + pub fn into_mut_with_update_expiration(mut self) -> &'a mut V { + //We push back an additional element, the first reference in the list will be ignored + // since we also updated the expires in the map, see below. + self.list.push_back(ExpiringElement { + element: self.entry.key().clone(), + expires: self.expiration, + }); + self.entry.get_mut().update_expiration(self.expiration); + &mut self.entry.into_mut().element + } } pub struct VacantEntry<'a, K, V> { @@ -120,6 +140,13 @@ where Entry::Vacant(entry) => entry.insert(default()), } } + + pub fn or_insert_with_update_expiration V>(self, default: F) -> &'a mut V { + match self { + Entry::Occupied(entry) => entry.into_mut_with_update_expiration(), + Entry::Vacant(entry) => entry.insert(default()), + } + } } impl TimeCache @@ -178,6 +205,13 @@ where } } + // Removes a certain key even if it didn't expire plus removing other expired keys + pub fn remove(&mut self, key: Key) -> Option { + let result = self.map.remove(&key).map(|el| el.element); + self.remove_expired_keys(Instant::now()); + result + } + /// Empties the entire cache. #[allow(dead_code)] pub fn clear(&mut self) { @@ -185,17 +219,36 @@ where self.list.clear(); } - pub fn contains_key(&mut self, key: &Key) -> bool { self.map.contains_key(key) } + pub fn contains_key(&self, key: &Key) -> bool { self.map.contains_key(key) } pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.element) } pub fn len(&self) -> usize { self.map.len() } + pub fn is_empty(&self) -> bool { self.map.is_empty() } + pub fn ttl(&self) -> Duration { self.ttl } + + pub fn iter(&self) -> Iter> { self.map.iter() } + + pub fn keys(&self) -> Keys> { self.map.keys() } +} + +impl TimeCache +where + Key: Eq + std::hash::Hash + Clone, + Value: Clone, +{ + pub fn as_hash_map(&self) -> std::collections::HashMap { + self.map + .iter() + .map(|(key, expiring_el)| (key.clone(), expiring_el.element.clone())) + .collect() + } } #[allow(dead_code)] -pub struct DuplicateCache(TimeCache); +pub struct DuplicateCache(TimeCache); #[allow(dead_code)] impl DuplicateCache @@ -296,4 +349,14 @@ mod test { // should be removed from the cache assert!(cache.insert("t")); } + + #[test] + fn test_remove() { + let mut cache = TimeCache::new(Duration::from_secs(10)); + + cache.insert("t", ""); + cache.insert("e", ""); + cache.remove("e"); + assert!(!cache.contains_key(&"e")); + } } diff --git a/mm2src/gossipsub/Cargo.toml b/mm2src/gossipsub/Cargo.toml index f79e2976b5..f8ef397e99 100644 --- a/mm2src/gossipsub/Cargo.toml +++ b/mm2src/gossipsub/Cargo.toml @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"] base64 = "0.11.0" bytes = "0.5.4" byteorder = "1.3.2" +common = { path = "../common" } fnv = "1.0.6" futures = "0.3.1" futures_codec = "0.4.0" diff --git a/mm2src/gossipsub/src/behaviour.rs b/mm2src/gossipsub/src/behaviour.rs index 411b6ddf21..c702d907f9 100644 --- a/mm2src/gossipsub/src/behaviour.rs +++ b/mm2src/gossipsub/src/behaviour.rs @@ -23,8 +23,8 @@ use crate::handler::GossipsubHandler; use crate::mcache::MessageCache; use crate::protocol::{GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, MessageId}; -use crate::time_cache::{Entry as TimeCacheEntry, TimeCache}; use crate::topic::{Topic, TopicHash}; +use common::time_cache::{Entry as TimeCacheEntry, TimeCache}; use futures::prelude::*; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler}; diff --git a/mm2src/gossipsub/src/lib.rs b/mm2src/gossipsub/src/lib.rs index e233356a2f..e0efa95571 100644 --- a/mm2src/gossipsub/src/lib.rs +++ b/mm2src/gossipsub/src/lib.rs @@ -141,7 +141,6 @@ mod behaviour; mod config; mod handler; mod mcache; -mod time_cache; mod topic; mod rpc_proto { diff --git a/mm2src/lp_native_dex.rs b/mm2src/lp_native_dex.rs index 50ff199421..ba7811727c 100644 --- a/mm2src/lp_native_dex.rs +++ b/mm2src/lp_native_dex.rs @@ -31,8 +31,8 @@ use std::str; #[cfg(not(target_arch = "wasm32"))] use crate::mm2::database::init_and_migrate_db; use crate::mm2::lp_network::{lp_ports, p2p_event_process_loop, P2PContext}; -use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, lp_ordermatch_loop, orders_kick_start, - BalanceUpdateOrdermatchHandler}; +use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, lp_ordermatch_loop, + orders_kick_start, BalanceUpdateOrdermatchHandler}; use crate::mm2::lp_swap::{running_swaps_num, swap_kick_starts}; use crate::mm2::rpc::spawn_rpc; use crate::mm2::{MM_DATETIME, MM_VERSION}; @@ -344,6 +344,8 @@ pub async fn lp_init(ctx: MmArc) -> Result<(), String> { spawn(broadcast_maker_orders_keep_alive_loop(ctx.clone())); + spawn(clean_memory_loop(ctx.clone())); + let ctx_id = try_s!(ctx.ffi_handle()); spawn_rpc(ctx_id); diff --git a/mm2src/lp_ordermatch.rs b/mm2src/lp_ordermatch.rs index c54c2c5f40..cb1818e9bc 100644 --- a/mm2src/lp_ordermatch.rs +++ b/mm2src/lp_ordermatch.rs @@ -29,12 +29,13 @@ use common::executor::{spawn, Timer}; use common::log::error; use common::mm_ctx::{from_ctx, MmArc, MmWeak}; use common::mm_number::{Fraction, MmNumber}; +use common::time_cache::TimeCache; use common::{bits256, json_dir_entries, log, new_uuid, now_ms, remove_file, write}; use derive_more::Display; use futures::{compat::Future01CompatExt, lock::Mutex as AsyncMutex, StreamExt, TryFutureExt}; use gstuff::slurp; use hash256_std_hasher::Hash256StdHasher; -use hash_db::{Hasher, EMPTY_PREFIX}; +use hash_db::Hasher; use http::Response; use keys::AddressFormat; use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, TopicPrefix, TOPIC_SEPARATOR}; @@ -51,6 +52,7 @@ use std::fmt; use std::fs::DirEntry; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use trie_db::NodeCodec as NodeCodecT; use uuid::Uuid; @@ -83,6 +85,14 @@ const TAKER_ORDER_TIMEOUT: u64 = 30; const ORDER_MATCH_TIMEOUT: u64 = 30; const ORDERBOOK_REQUESTING_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 2; const MAX_ORDERS_NUMBER_IN_ORDERBOOK_RESPONSE: usize = 1000; +#[cfg(not(test))] +const TRIE_STATE_HISTORY_TIMEOUT: u64 = 14400; +#[cfg(test)] +const TRIE_STATE_HISTORY_TIMEOUT: u64 = 3; +#[cfg(not(test))] +const TRIE_ORDER_HISTORY_TIMEOUT: u64 = 300; +#[cfg(test)] +const TRIE_ORDER_HISTORY_TIMEOUT: u64 = 3; /// Alphabetically ordered orderbook pair type AlbOrderedOrderbookPair = String; @@ -122,7 +132,7 @@ fn process_pubkey_full_trie( alb_pair: &str, new_trie_orders: PubkeyOrders, ) -> H64 { - remove_and_purge_pubkey_pair_orders(orderbook, pubkey, alb_pair); + remove_pubkey_pair_orders(orderbook, pubkey, alb_pair); for (_uuid, order) in new_trie_orders { orderbook.insert_or_update_order_update_trie(order); @@ -296,24 +306,11 @@ async fn insert_or_update_order(ctx: &MmArc, item: OrderbookItem) { async fn delete_order(ctx: &MmArc, pubkey: &str, uuid: Uuid) { let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).expect("from_ctx failed"); - let mut inactive = ordermatch_ctx.inactive_orders.lock().await; - match inactive.get(&uuid) { - // don't remove the order if the pubkey is not equal - Some(order) if order.pubkey != pubkey => (), - Some(_) => { - inactive.remove(&uuid); - }, - None => (), - } - let mut orderbook = ordermatch_ctx.orderbook.lock().await; - match orderbook.order_set.get(&uuid) { - // don't remove the order if the pubkey is not equal - Some(order) if order.pubkey != pubkey => (), - Some(_) => { + if let Some(order) = orderbook.order_set.get(&uuid) { + if order.pubkey == pubkey { orderbook.remove_order_trie_update(uuid); - }, - None => (), + } } } @@ -323,16 +320,17 @@ async fn delete_my_order(ctx: &MmArc, uuid: Uuid) { orderbook.remove_order_trie_update(uuid); } -fn remove_and_purge_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, alb_pair: &str) { +fn remove_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, alb_pair: &str) { let pubkey_state = match orderbook.pubkeys_state.get_mut(pubkey) { Some(state) => state, None => return, }; - let pair_root = match pubkey_state.trie_roots.remove(alb_pair) { - Some(root) => root, - None => return, - }; - pubkey_state.order_pairs_trie_state_history.remove(alb_pair); + + if pubkey_state.trie_roots.get(alb_pair).is_none() { + return; + } + + pubkey_state.order_pairs_trie_state_history.remove(alb_pair.into()); let mut orders_to_remove = Vec::with_capacity(pubkey_state.orders_uuids.len()); pubkey_state.orders_uuids.retain(|(uuid, alb)| { @@ -345,15 +343,15 @@ fn remove_and_purge_pubkey_pair_orders(orderbook: &mut Orderbook, pubkey: &str, }); for order in orders_to_remove { - orderbook.remove_order(order); + orderbook.remove_order_trie_update(order); } - if orderbook.memory_db.remove_and_purge(&pair_root, EMPTY_PREFIX).is_none() - && pair_root != H64::default() - && pair_root != hashed_null_node::() - { - log::warn!("Warning: couldn't find {:?} hash root in memory_db", pair_root); - } + let pubkey_state = match orderbook.pubkeys_state.get_mut(pubkey) { + Some(state) => state, + None => return, + }; + + pubkey_state.trie_roots.remove(alb_pair); } /// Attempts to decode a message and process it returning whether the message is valid and worth rebroadcasting @@ -1891,23 +1889,15 @@ enum OrderbookRequestingState { type H64 = [u8; 8]; -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq)] struct TrieDiff { delta: Vec<(Key, Option)>, next_root: H64, } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] struct TrieDiffHistory { - inner: HashMap>, -} - -impl Default for TrieDiffHistory { - fn default() -> Self { - TrieDiffHistory { - inner: Default::default(), - } - } + inner: TimeCache>, } impl TrieDiffHistory { @@ -1917,11 +1907,11 @@ impl TrieDiffHistory { return; } - match self.inner.remove(&diff.next_root) { + match self.inner.remove(diff.next_root) { Some(mut diff) => { // we reached a state that was already reached previously // history can be cleaned up to this state hash - while let Some(next_diff) = self.inner.remove(&diff.next_root) { + while let Some(next_diff) = self.inner.remove(diff.next_root) { diff = next_diff; } }, @@ -1932,29 +1922,42 @@ impl TrieDiffHistory { } #[allow(dead_code)] - fn remove_key(&mut self, key: &H64) { self.inner.remove(key); } + fn remove_key(&mut self, key: H64) { self.inner.remove(key); } #[allow(dead_code)] fn contains_key(&self, key: &H64) -> bool { self.inner.contains_key(key) } fn get(&self, key: &H64) -> Option<&TrieDiff> { self.inner.get(key) } + + #[allow(dead_code)] + fn len(&self) -> usize { self.inner.len() } } type TrieOrderHistory = TrieDiffHistory; -#[derive(Default)] struct OrderbookPubkeyState { /// Timestamp of the latest keep alive message received last_keep_alive: u64, /// The map storing historical data about specific pair subtrie changes /// Used to get diffs of orders of pair between specific root hashes - order_pairs_trie_state_history: HashMap, + order_pairs_trie_state_history: TimeCache, /// The known UUIDs owned by pubkey with alphabetically ordered pair to ease the lookup during pubkey orderbook requests orders_uuids: HashSet<(Uuid, AlbOrderedOrderbookPair)>, /// The map storing alphabetically ordered pair with trie root hash of orders owned by pubkey. trie_roots: HashMap, } +impl OrderbookPubkeyState { + pub fn with_history_timeout(ttl: Duration) -> OrderbookPubkeyState { + OrderbookPubkeyState { + last_keep_alive: now_ms() / 1000, + order_pairs_trie_state_history: TimeCache::new(ttl), + orders_uuids: HashSet::default(), + trie_roots: HashMap::default(), + } + } +} + fn get_trie_mut<'a>( mem_db: &'a mut MemoryDB, root: &'a mut H64, @@ -1973,10 +1976,7 @@ fn pubkey_state_mut<'a>( match state.raw_entry_mut().from_key(from_pubkey) { RawEntryMut::Occupied(e) => e.into_mut(), RawEntryMut::Vacant(e) => { - let state = OrderbookPubkeyState { - last_keep_alive: now_ms() / 1000, - ..OrderbookPubkeyState::default() - }; + let state = OrderbookPubkeyState::with_history_timeout(Duration::new(TRIE_STATE_HISTORY_TIMEOUT, 0)); e.insert(from_pubkey.to_string(), state).1 }, } @@ -1990,13 +1990,14 @@ fn order_pair_root_mut<'a>(state: &'a mut HashMap, } fn pair_history_mut<'a>( - state: &'a mut HashMap, + state: &'a mut TimeCache, pair: &str, ) -> &'a mut TrieOrderHistory { - match state.raw_entry_mut().from_key(pair) { - RawEntryMut::Occupied(e) => e.into_mut(), - RawEntryMut::Vacant(e) => e.insert(pair.to_owned(), Default::default()).1, - } + state + .entry(pair.into()) + .or_insert_with_update_expiration(|| TrieOrderHistory { + inner: TimeCache::new(Duration::from_secs(TRIE_ORDER_HISTORY_TIMEOUT)), + }) } /// `parity_util_mem::malloc_size` crushes for some reason on wasm32 @@ -2007,17 +2008,16 @@ fn collect_orderbook_metrics(_ctx: &MmArc, _orderbook: &Orderbook) {} fn collect_orderbook_metrics(ctx: &MmArc, orderbook: &Orderbook) { use parity_util_mem::malloc_size; - fn history_committed_changes(history: &HashMap) -> i64 { - let total = history - .iter() - .fold(0usize, |total, (_alb_pair, history)| total + history.inner.len()); + fn history_committed_changes(history: &TimeCache) -> i64 { + let total = history.iter().fold(0usize, |total, (_alb_pair, history)| { + total + history.get_element().inner.len() + }); total as i64 } let memory_db_size = malloc_size(&orderbook.memory_db); mm_gauge!(ctx.metrics, "orderbook.len", orderbook.order_set.len() as i64); mm_gauge!(ctx.metrics, "orderbook.memory_db", memory_db_size as i64); - // mm_gauge!(ctx.metrics, "inactive_orders.len", inactive.len() as i64); // TODO remove metrics below after testing for (pubkey, pubkey_state) in orderbook.pubkeys_state.iter() { @@ -2076,24 +2076,25 @@ impl Orderbook { pubkey_state.orders_uuids.insert((order.uuid, alb_ordered.clone())); - let mut pair_trie = match get_trie_mut(&mut self.memory_db, pair_root) { - Ok(trie) => trie, - Err(e) => { - log::error!("Error getting {} trie with root {:?}", e, prev_root); + { + let mut pair_trie = match get_trie_mut(&mut self.memory_db, pair_root) { + Ok(trie) => trie, + Err(e) => { + log::error!("Error getting {} trie with root {:?}", e, prev_root); + return; + }, + }; + let order_bytes = rmp_serde::to_vec(&order).expect("Serialization should never fail"); + if let Err(e) = pair_trie.insert(order.uuid.as_bytes(), &order_bytes) { + log::error!( + "Error {:?} on insertion to trie. Key {}, value {:?}", + e, + order.uuid, + order_bytes + ); return; - }, - }; - let order_bytes = rmp_serde::to_vec(&order).expect("Serialization should never fail"); - if let Err(e) = pair_trie.insert(order.uuid.as_bytes(), &order_bytes) { - log::error!( - "Error {:?} on insertion to trie. Key {}, value {:?}", - e, - order.uuid, - order_bytes - ); - return; - }; - drop(pair_trie); + }; + } if prev_root != H64::default() { let history = pair_history_mut(&mut pubkey_state.order_pairs_trie_state_history, &alb_ordered); @@ -2148,36 +2149,6 @@ impl Orderbook { self.order_set.insert(order.uuid, order); } - fn remove_order(&mut self, uuid: Uuid) -> Option { - let order = match self.order_set.remove(&uuid) { - Some(order) => order, - None => return None, - }; - let base_rel = (order.base.clone(), order.rel.clone()); - - // create an `order_to_delete` that allows to find and remove an element from `self.ordered` by hash - let order_to_delete = OrderedByPriceOrder { - price: order.price.clone().into(), - uuid, - }; - - if let Some(orders) = self.ordered.get_mut(&base_rel) { - orders.remove(&order_to_delete); - if orders.is_empty() { - self.ordered.remove(&base_rel); - } - } - - if let Some(orders) = self.unordered.get_mut(&base_rel) { - // use the same uuid to remove an order - orders.remove(&order_to_delete.uuid); - if orders.is_empty() { - self.unordered.remove(&base_rel); - } - }; - Some(order) - } - fn remove_order_trie_update(&mut self, uuid: Uuid) -> Option { let order = match self.order_set.remove(&uuid) { Some(order) => order, @@ -2225,11 +2196,13 @@ impl Orderbook { }, }; - let history = pair_history_mut(&mut pubkey_state.order_pairs_trie_state_history, &alb_ordered); - history.insert_new_diff(old_state, TrieDiff { - delta: vec![(uuid, None)], - next_root: *pair_state, - }); + if pubkey_state.order_pairs_trie_state_history.get(&alb_ordered).is_some() { + let history = pair_history_mut(&mut pubkey_state.order_pairs_trie_state_history, &alb_ordered); + history.insert_new_diff(old_state, TrieDiff { + delta: vec![(uuid, None)], + next_root: *pair_state, + }); + } Some(order) } @@ -2293,7 +2266,6 @@ struct OrdermatchContext { pub my_taker_orders: AsyncMutex>, pub my_cancelled_orders: AsyncMutex>, pub orderbook: AsyncMutex, - pub inactive_orders: AsyncMutex>, } #[cfg_attr(test, mockable)] @@ -2515,7 +2487,10 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { spawn({ let ctx = ctx.clone(); async move { - maker_order_created_p2p_notify(ctx, &maker_order).await; + if let Ok(Some((_, _))) = find_pair(&ctx, &maker_order.base, &maker_order.rel).await + { + maker_order_created_p2p_notify(ctx, &maker_order).await; + } } }); } else { @@ -2562,25 +2537,22 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { // remove "timed out" pubkeys states with their orders from orderbook let mut orderbook = ordermatch_ctx.orderbook.lock().await; let mut uuids_to_remove = vec![]; - let mut keys_to_remove = vec![]; - orderbook.pubkeys_state.retain(|pubkey, state| { - let to_retain = pubkey == &my_pubsecp || state.last_keep_alive + maker_order_timeout > now_ms() / 1000; - if !to_retain { + let mut pubkeys_to_remove = vec![]; + for (pubkey, state) in orderbook.pubkeys_state.iter() { + let to_keep = pubkey == &my_pubsecp || state.last_keep_alive + maker_order_timeout > now_ms() / 1000; + if !to_keep { for (uuid, _) in &state.orders_uuids { uuids_to_remove.push(*uuid); } - for root in state.trie_roots.values() { - keys_to_remove.push(*root); - } + pubkeys_to_remove.push(pubkey.clone()); } - to_retain - }); - for uuid in uuids_to_remove { - orderbook.remove_order(uuid); } - for key in keys_to_remove { - orderbook.memory_db.remove_and_purge(&key, EMPTY_PREFIX); + for uuid in uuids_to_remove { + orderbook.remove_order_trie_update(uuid); + } + for pubkey in pubkeys_to_remove { + orderbook.pubkeys_state.remove(&pubkey); } collect_orderbook_metrics(&ctx, &orderbook); @@ -2624,6 +2596,20 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { } } +pub async fn clean_memory_loop(ctx: MmArc) { + loop { + if ctx.is_stopping() { + break; + } + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + { + let mut orderbook = ordermatch_ctx.orderbook.lock().await; + orderbook.memory_db.purge(); + } + Timer::sleep(600.).await; + } +} + async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg: MakerReserved) { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); let our_public_id = ctx.public_id().unwrap(); diff --git a/mm2src/ordermatch_tests.rs b/mm2src/ordermatch_tests.rs index b38f77fa5a..987e5f2935 100644 --- a/mm2src/ordermatch_tests.rs +++ b/mm2src/ordermatch_tests.rs @@ -2349,40 +2349,36 @@ fn test_orderbook_pubkey_sync_request_relay() { #[test] fn test_trie_diff_avoid_cycle_on_insertion() { - let mut history = TrieDiffHistory::::default(); + let mut history = TrieDiffHistory:: { + inner: TimeCache::new(Duration::from_secs(3600)), + }; history.insert_new_diff([1; 8], TrieDiff { delta: vec![], next_root: [2; 8], }); - history.insert_new_diff([2; 8], TrieDiff { delta: vec![], next_root: [3; 8], }); - history.insert_new_diff([3; 8], TrieDiff { delta: vec![], next_root: [4; 8], }); - history.insert_new_diff([4; 8], TrieDiff { delta: vec![], next_root: [5; 8], }); - history.insert_new_diff([5; 8], TrieDiff { delta: vec![], next_root: [2; 8], }); - let expected = TrieDiffHistory { - inner: HashMap::from_iter(iter::once(([1; 8], TrieDiff { - delta: vec![], - next_root: [2; 8], - }))), - }; + let expected = HashMap::from_iter(iter::once(([1u8; 8], TrieDiff { + delta: vec![], + next_root: [2; 8], + }))); - assert_eq!(expected, history); + assert_eq!(expected, history.inner.as_hash_map()); } #[test] @@ -2547,6 +2543,172 @@ fn test_remove_and_purge_pubkey_pair_orders() { let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); let mut orderbook = block_on(ordermatch_ctx.orderbook.lock()); - remove_and_purge_pubkey_pair_orders(&mut orderbook, &pubkey, &rick_morty_pair); + remove_pubkey_pair_orders(&mut orderbook, &pubkey, &rick_morty_pair); check_if_orderbook_contains_only(&orderbook, &pubkey, &rick_kmd_orders); } + +#[test] +fn test_orderbook_sync_trie_diff_time_cache() { + let (ctx_bob, pubkey_bob, secret_bob) = make_ctx_for_tests(); + let rick_morty_orders = make_random_orders(pubkey_bob.clone(), &secret_bob, "RICK".into(), "MORTY".into(), 15); + + let rick_morty_pair = alb_ordered_pair("RICK", "MORTY"); + + for order in &rick_morty_orders[..5] { + block_on(insert_or_update_order(&ctx_bob, order.clone())); + } + + std::thread::sleep(Duration::from_secs(3)); + + for order in &rick_morty_orders[5..10] { + block_on(insert_or_update_order(&ctx_bob, order.clone())); + } + + let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); + let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + let rick_morty_history_bob = bob_state.order_pairs_trie_state_history.get(&rick_morty_pair).unwrap(); + assert_eq!(rick_morty_history_bob.len(), 5); + + // alice has an outdated state, for which bob doesn't have history anymore as it's expired + let (ctx_alice, ..) = make_ctx_for_tests(); + + for order in &rick_morty_orders[..3] { + block_on(insert_or_update_order(&ctx_alice, order.clone())); + } + + let ordermatch_ctx_alice = OrdermatchContext::from_ctx(&ctx_alice).unwrap(); + let mut orderbook_alice = block_on(ordermatch_ctx_alice.orderbook.lock()); + let bob_state_on_alice_side = orderbook_alice.pubkeys_state.get(&pubkey_bob).unwrap(); + + let alice_root = bob_state_on_alice_side.trie_roots.get(&rick_morty_pair).unwrap(); + let bob_root = bob_state.trie_roots.get(&rick_morty_pair).unwrap(); + + let bob_history_on_sync = DeltaOrFullTrie::from_history( + &rick_morty_history_bob, + *alice_root, + *bob_root, + &orderbook_bob.memory_db, + ) + .unwrap(); + + let full_trie = match bob_history_on_sync { + DeltaOrFullTrie::FullTrie(trie) => trie, + _ => panic!("Expected DeltaOrFullTrie::FullTrie"), + }; + + let new_alice_root = process_pubkey_full_trie(&mut orderbook_alice, &pubkey_bob, &rick_morty_pair, full_trie); + + assert_eq!(new_alice_root, *bob_root); + + drop(orderbook_bob); + drop(orderbook_alice); + + for order in &rick_morty_orders[10..] { + block_on(insert_or_update_order(&ctx_bob, order.clone())); + } + + let mut orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + + orderbook_bob.remove_order_trie_update(rick_morty_orders[12].uuid); + + let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + let rick_morty_history_bob = bob_state.order_pairs_trie_state_history.get(&rick_morty_pair).unwrap(); + + let mut orderbook_alice = block_on(ordermatch_ctx_alice.orderbook.lock()); + let bob_state_on_alice_side = orderbook_alice.pubkeys_state.get(&pubkey_bob).unwrap(); + + let alice_root = bob_state_on_alice_side.trie_roots.get(&rick_morty_pair).unwrap(); + let bob_root = bob_state.trie_roots.get(&rick_morty_pair).unwrap(); + + let bob_history_on_sync = DeltaOrFullTrie::from_history( + &rick_morty_history_bob, + *alice_root, + *bob_root, + &orderbook_bob.memory_db, + ) + .unwrap(); + + // Check that alice gets orders from history this time + let trie_delta = match bob_history_on_sync { + DeltaOrFullTrie::Delta(delta) => delta, + _ => panic!("Expected DeltaOrFullTrie::Delta"), + }; + + let new_alice_root = process_trie_delta(&mut orderbook_alice, &pubkey_bob, &rick_morty_pair, trie_delta); + assert_eq!(new_alice_root, *bob_root); +} + +#[test] +fn test_orderbook_order_pairs_trie_state_history_updates_expiration_on_insert() { + let (ctx_bob, pubkey_bob, secret_bob) = make_ctx_for_tests(); + let rick_morty_orders = make_random_orders(pubkey_bob.clone(), &secret_bob, "RICK".into(), "MORTY".into(), 15); + + let rick_morty_pair = alb_ordered_pair("RICK", "MORTY"); + + for order in &rick_morty_orders[..5] { + block_on(insert_or_update_order(&ctx_bob, order.clone())); + } + + // After 3 seconds RICK:MORTY pair trie state history will time out and will be empty + std::thread::sleep(Duration::from_secs(3)); + + // Insert some more orders to remove expired timecache RICK:MORTY key + for order in &rick_morty_orders[5..10] { + block_on(insert_or_update_order(&ctx_bob, order.clone())); + } + + let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); + let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + + // Only the last inserted 5 orders are found + assert_eq!( + bob_state + .order_pairs_trie_state_history + .get(&rick_morty_pair) + .unwrap() + .len(), + 5 + ); + + drop(orderbook_bob); + + std::thread::sleep(Duration::from_secs(2)); + + // On inserting 5 more orders expiration for RICK:MORTY pair trie state history will be reset + for order in &rick_morty_orders[10..] { + block_on(insert_or_update_order(&ctx_bob, order.clone())); + } + + let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); + let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + + assert_eq!( + bob_state + .order_pairs_trie_state_history + .get(&rick_morty_pair) + .unwrap() + .len(), + 10 + ); + + drop(orderbook_bob); + + std::thread::sleep(Duration::from_secs(1)); + + let ordermatch_ctx_bob = OrdermatchContext::from_ctx(&ctx_bob).unwrap(); + let orderbook_bob = block_on(ordermatch_ctx_bob.orderbook.lock()); + let bob_state = orderbook_bob.pubkeys_state.get(&pubkey_bob).unwrap(); + + // After 3 seconds from inserting orders number 6-10 these orders have not expired due to updated expiration on inserting orders 11-15 + assert_eq!( + bob_state + .order_pairs_trie_state_history + .get(&rick_morty_pair) + .unwrap() + .len(), + 10 + ); +}