Skip to content
Merged
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions docs/HEAPTRACK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Memory profiling MM2 with heaptrack
1. Install dependencies required by heaptrack if they are not already installed on the system
* extra-cmake-modules
* Qt 5.2 or higher: Core, Widgets
* KDE Frameworks 5: CoreAddons, I18n, ItemModels, ThreadWeaver, ConfigWidgets, KIO, IconThemes

2. Install heaptrack on Ubuntu (18.04) or higher:
```
sudo apt install heaptrack heaptrack-gui
```

3. Use heaptrack to run MM2 binary and pass parameters as usual. An example for this would be:
```
heaptrack ./mm2 "{\"gui\":\"MM2GUI\",\"netid\":7777, \"userhome\":\"/${HOME#"/"}\", \"passphrase\":\"YOUR_PASSPHRASE_HERE\", \"rpc_password\":\"YOUR_PASSWORD_HERE\",\"i_am_seed\":true}" &
```
Running heaptrack like this writes a gzipped result file in the same folder the above command ran from. We can now take a look at using the next step.

4. After running MM2 for sometime we can visualize the memory profiling results using the below command. Note that ```heaptrack.mm2.xxxx.gz``` is the name of the file generated through the above command with numbers instead of xxxx
```
heaptrack_gui heaptrack.mm2.xxxx.gz
```
2 changes: 2 additions & 0 deletions mm2src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ cfg-if = "1.0"
crossbeam = "0.7"
derive_more = "0.99"
findshlibs = "0.5"
fnv = "1.0.6"
fomat-macros = "0.2"
futures01 = { version = "0.1", package = "futures" }
futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] }
Expand Down Expand Up @@ -50,6 +51,7 @@ serde_repr = "0.1.6"
ser_error = { path = "../derives/ser_error" }
ser_error_derive = { path = "../derives/ser_error_derive" }
uuid = { version = "0.7", features = ["serde", "v4"] }
wasm-timer = "0.2.4"
winapi = "0.3"

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
1 change: 1 addition & 0 deletions mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub mod mm_number;
pub mod privkey;
pub mod seri;
#[path = "patterns/state_machine.rs"] pub mod state_machine;
pub mod time_cache;

#[cfg(target_arch = "wasm32")] pub mod wasm_indexed_db;
#[cfg(target_arch = "wasm32")] pub mod wasm_rpc;
Expand Down
71 changes: 67 additions & 4 deletions mm2src/gossipsub/src/time_cache.rs → mm2src/common/time_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,27 @@

use fnv::FnvHashMap;
use std::collections::hash_map::{self,
Entry::{Occupied, Vacant}};
Entry::{Occupied, Vacant},
Iter, Keys};
use std::collections::VecDeque;
use std::time::Duration;
use wasm_timer::Instant;

struct ExpiringElement<Element> {
#[derive(Debug)]
pub struct ExpiringElement<Element> {
/// The element that expires
element: Element,
/// The expire time.
expires: Instant,
}

impl<Element> ExpiringElement<Element> {
pub fn get_element(&self) -> &Element { &self.element }

pub fn update_expiration(&mut self, expires: Instant) { self.expires = expires }
}

#[derive(Debug)]
pub struct TimeCache<Key, Value> {
/// Mapping a key to its value together with its latest expire time (can be updated through
/// reinserts).
Expand Down Expand Up @@ -77,6 +86,17 @@ where
})
.element
}

pub fn into_mut_with_update_expiration(mut self) -> &'a mut V {
//We push back an additional element, the first reference in the list will be ignored
// since we also updated the expires in the map, see below.
self.list.push_back(ExpiringElement {
element: self.entry.key().clone(),
expires: self.expiration,
});
self.entry.get_mut().update_expiration(self.expiration);
&mut self.entry.into_mut().element
}
}

pub struct VacantEntry<'a, K, V> {
Expand Down Expand Up @@ -120,6 +140,13 @@ where
Entry::Vacant(entry) => entry.insert(default()),
}
}

pub fn or_insert_with_update_expiration<F: FnOnce() -> V>(self, default: F) -> &'a mut V {
match self {
Entry::Occupied(entry) => entry.into_mut_with_update_expiration(),
Entry::Vacant(entry) => entry.insert(default()),
}
}
}

impl<Key, Value> TimeCache<Key, Value>
Expand Down Expand Up @@ -178,24 +205,50 @@ where
}
}

// Removes a certain key even if it didn't expire plus removing other expired keys
pub fn remove(&mut self, key: Key) -> Option<Value> {
Comment thread
artemii235 marked this conversation as resolved.
let result = self.map.remove(&key).map(|el| el.element);
self.remove_expired_keys(Instant::now());
result
}

/// Empties the entire cache.
#[allow(dead_code)]
pub fn clear(&mut self) {
self.map.clear();
self.list.clear();
}

pub fn contains_key(&mut self, key: &Key) -> bool { self.map.contains_key(key) }
pub fn contains_key(&self, key: &Key) -> bool { self.map.contains_key(key) }

pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.element) }

pub fn len(&self) -> usize { self.map.len() }

pub fn is_empty(&self) -> bool { self.map.is_empty() }

pub fn ttl(&self) -> Duration { self.ttl }

pub fn iter(&self) -> Iter<Key, ExpiringElement<Value>> { self.map.iter() }

pub fn keys(&self) -> Keys<Key, ExpiringElement<Value>> { self.map.keys() }
}

impl<Key, Value> TimeCache<Key, Value>
where
Key: Eq + std::hash::Hash + Clone,
Value: Clone,
{
pub fn as_hash_map(&self) -> std::collections::HashMap<Key, Value> {
self.map
.iter()
.map(|(key, expiring_el)| (key.clone(), expiring_el.element.clone()))
.collect()
}
}

#[allow(dead_code)]
pub struct DuplicateCache<Key>(TimeCache<Key, ()>);
pub struct DuplicateCache<Key: std::hash::Hash>(TimeCache<Key, ()>);

#[allow(dead_code)]
impl<Key> DuplicateCache<Key>
Expand Down Expand Up @@ -296,4 +349,14 @@ mod test {
// should be removed from the cache
assert!(cache.insert("t"));
}

#[test]
fn test_remove() {
let mut cache = TimeCache::new(Duration::from_secs(10));

cache.insert("t", "");
cache.insert("e", "");
cache.remove("e");
assert!(!cache.contains_key(&"e"));
}
}
1 change: 1 addition & 0 deletions mm2src/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"]
base64 = "0.11.0"
bytes = "0.5.4"
byteorder = "1.3.2"
common = { path = "../common" }
fnv = "1.0.6"
futures = "0.3.1"
futures_codec = "0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion mm2src/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::handler::GossipsubHandler;
use crate::mcache::MessageCache;
use crate::protocol::{GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId};
use crate::time_cache::{Entry as TimeCacheEntry, TimeCache};
use crate::topic::{Topic, TopicHash};
use common::time_cache::{Entry as TimeCacheEntry, TimeCache};
use futures::prelude::*;
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler};
Expand Down
1 change: 0 additions & 1 deletion mm2src/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ mod behaviour;
mod config;
mod handler;
mod mcache;
mod time_cache;
mod topic;

mod rpc_proto {
Expand Down
6 changes: 4 additions & 2 deletions mm2src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use std::str;
#[cfg(not(target_arch = "wasm32"))]
use crate::mm2::database::init_and_migrate_db;
use crate::mm2::lp_network::{lp_ports, p2p_event_process_loop, P2PContext};
use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, lp_ordermatch_loop, orders_kick_start,
BalanceUpdateOrdermatchHandler};
use crate::mm2::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, lp_ordermatch_loop,
orders_kick_start, BalanceUpdateOrdermatchHandler};
use crate::mm2::lp_swap::{running_swaps_num, swap_kick_starts};
use crate::mm2::rpc::spawn_rpc;
use crate::mm2::{MM_DATETIME, MM_VERSION};
Expand Down Expand Up @@ -344,6 +344,8 @@ pub async fn lp_init(ctx: MmArc) -> Result<(), String> {

spawn(broadcast_maker_orders_keep_alive_loop(ctx.clone()));

spawn(clean_memory_loop(ctx.clone()));

let ctx_id = try_s!(ctx.ffi_handle());

spawn_rpc(ctx_id);
Expand Down
Loading