Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6ad60e9
add recently cancelled orders time cache to orderbook
shamardy Sep 28, 2024
232019b
revert test_cancel_order changes
shamardy Sep 28, 2024
1c38da7
review fix: cherry-pick 62d26c7
mariocynicys Sep 4, 2024
da884fc
optimize `clear_expired_entries`
shamardy Sep 30, 2024
5762d13
provide a consistent interface for `ExpirableMap`
shamardy Sep 30, 2024
f2ef6d3
use Copy type parameter instead of Clone in `ExpirableMap`
shamardy Sep 30, 2024
87c53f8
make test_cancel_all_orders not rely on the order of messages
shamardy Oct 1, 2024
5591f89
Make `RECENTLY_CANCELLED_TIMEOUT` `Duration` Type
shamardy Oct 1, 2024
69fdfe9
use ctx as reference in `maker_order_cancelled_p2p_notify`
shamardy Oct 1, 2024
14f7f34
move `recently_cancelled` time cache to it's own mutex in `Ordermatch…
shamardy Oct 1, 2024
e0e5d41
Revert "move `recently_cancelled` time cache to it's own mutex in `Or…
shamardy Oct 2, 2024
cdfd809
move recently cancelled back to orderbook struct and handle it in a b…
shamardy Oct 2, 2024
860544f
review fix: clarify some comments
shamardy Oct 2, 2024
20fa7e5
add order to `recently_cancelled` even if it was in the `order_set`
shamardy Oct 2, 2024
d2675d2
review fix: add implementation details to `clear_expired_entries`
shamardy Oct 2, 2024
abfb252
review fix: check expiry on remove as well
shamardy Oct 2, 2024
5a839be
review fix: move implementation details to a comment inside the `clea…
shamardy Oct 2, 2024
598045d
revert `test_cancel_all_orders` to dev state and add `test_order_canc…
shamardy Oct 3, 2024
9faa804
fix clippy
shamardy Oct 3, 2024
158252c
review fix: remove return types from some functions that don't need them
shamardy Oct 3, 2024
ff54b83
review fix: fix comments in `test_order_cancellation_received_before_…
shamardy Oct 3, 2024
3f0a6bd
Merge remote-tracking branch 'origin/dev' into fix-p2p-cancellation
shamardy Oct 3, 2024
b1cae0f
reorder imports after merge
shamardy Oct 3, 2024
9dd4544
remove adex-cli CI workflow
shamardy Oct 4, 2024
d55efc3
update `mm2_libp2p` dependency name for consistency
shamardy Oct 4, 2024
1e6bbb8
Merge remote-tracking branch 'origin/dev' into fix-p2p-cancellation
shamardy Oct 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 3 additions & 14 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,7 @@ impl WebsocketTransport {
}
}

async fn handle_keepalive(
&self,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
expires_at: Option<Instant>,
) -> OuterAction {
async fn handle_keepalive(&self, wsocket: &mut WebSocketStream, expires_at: Option<Instant>) -> OuterAction {
const SIMPLE_REQUEST: &str = r#"{"jsonrpc":"2.0","method":"net_version","params":[],"id": 0 }"#;

if let Some(expires_at) = expires_at {
Expand All @@ -112,10 +107,7 @@ impl WebsocketTransport {
}
}

// Drop expired response notifier channels
response_notifiers.clear_expired_entries();
Comment thread
shamardy marked this conversation as resolved.

let mut should_continue = Default::default();
let mut should_continue = false;
for _ in 0..MAX_ATTEMPTS {
match wsocket
.send(tokio_tungstenite_wasm::Message::Text(SIMPLE_REQUEST.to_string()))
Expand Down Expand Up @@ -206,9 +198,6 @@ impl WebsocketTransport {
}

if let Some(id) = inc_event.get("id") {
// just to ensure we don't have outdated entries
response_notifiers.clear_expired_entries();

let request_id = id.as_u64().unwrap_or_default() as usize;

if let Some(notifier) = response_notifiers.remove(&request_id) {
Comment thread
mariocynicys marked this conversation as resolved.
Expand Down Expand Up @@ -279,7 +268,7 @@ impl WebsocketTransport {
loop {
futures_util::select! {
_ = keepalive_interval.next().fuse() => {
match self.handle_keepalive(&mut wsocket, &mut response_notifiers, expires_at).await {
match self.handle_keepalive(&mut wsocket, expires_at).await {
OuterAction::None => {},
OuterAction::Continue => continue,
OuterAction::Break => break,
Expand Down
90 changes: 64 additions & 26 deletions mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use instant::{Duration, Instant};
use rustc_hash::FxHashMap;
use std::hash::Hash;
use std::{collections::BTreeMap, hash::Hash};

#[derive(Clone, Debug)]
pub struct ExpirableEntry<V> {
Expand Down Expand Up @@ -36,45 +36,83 @@ impl<V> ExpirableEntry<V> {
pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl }
}

impl<K: Eq + Hash, V> Default for ExpirableMap<K, V> {
impl<K: Eq + Hash + Copy, V> Default for ExpirableMap<K, V> {
fn default() -> Self { Self::new() }
}

/// A map that allows associating values with keys and expiring entries.
/// It is important to note that this implementation does not automatically
/// remove any entries; it is the caller's responsibility to invoke `clear_expired_entries`
/// at specified intervals.
/// It is important to note that this implementation does not have a background worker to
/// automatically clear expired entries. Outdated entries are only removed when the control flow
/// is handed back to the map mutably (i.e. some mutable method of the map is invoked).
///
/// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap`
/// under the hood and is not suitable for cryptographic purposes.
#[derive(Clone, Debug)]
pub struct ExpirableMap<K: Eq + Hash, V>(FxHashMap<K, ExpirableEntry<V>>);
pub struct ExpirableMap<K: Eq + Hash + Copy, V> {
map: FxHashMap<K, ExpirableEntry<V>>,
/// A sorted inverse map from expiration times to keys to speed up expired entries clearing.
expiries: BTreeMap<Instant, K>,
}

impl<K: Eq + Hash, V> ExpirableMap<K, V> {
impl<K: Eq + Hash + Copy, V> ExpirableMap<K, V> {
/// Creates a new empty `ExpirableMap`
#[inline]
pub fn new() -> Self { Self(FxHashMap::default()) }
pub fn new() -> Self {
Self {
map: FxHashMap::default(),
expiries: BTreeMap::new(),
}
}

/// Returns the associated value if present and not expired.
#[inline]
pub fn get(&self, k: &K) -> Option<&V> {
self.map
.get(k)
.filter(|v| v.expires_at > Instant::now())
.map(|v| &v.value)
}

/// Returns the associated value if present.
/// Removes a key-value pair from the map and returns the associated value if present and not expired.
#[inline]
pub fn get(&mut self, k: &K) -> Option<&V> { self.0.get(k).map(|v| &v.value) }
pub fn remove(&mut self, k: &K) -> Option<V> {
self.map.remove(k).filter(|v| v.expires_at > Instant::now()).map(|v| {
self.expiries.remove(&v.expires_at);
v.value
})
}

/// Inserts a key-value pair with an expiration duration.
///
/// If a value already exists for the given key, it will be updated and then
/// the old one will be returned.
pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option<V> {
self.clear_expired_entries();
Comment thread
shamardy marked this conversation as resolved.
let entry = ExpirableEntry::new(v, exp);

self.0.insert(k, entry).map(|v| v.value)
self.expiries.insert(entry.expires_at, k);
self.map.insert(k, entry).map(|v| v.value)
}

/// Removes expired entries from the map.
pub fn clear_expired_entries(&mut self) { self.0.retain(|_k, v| Instant::now() < v.expires_at); }

/// Removes a key-value pair from the map and returns the associated value if present.
#[inline]
pub fn remove(&mut self, k: &K) -> Option<V> { self.0.remove(k).map(|v| v.value) }
///
/// Iterates through the `expiries` in order, removing entries that have expired.
/// Stops at the first non-expired entry, leveraging the sorted nature of `BTreeMap`.
fn clear_expired_entries(&mut self) {
let now = Instant::now();

// `pop_first()` is used here as it efficiently removes expired entries.
// `first_key_value()` was considered as it wouldn't need re-insertion for
// non-expired entries, but it would require an extra remove operation for
// each expired entry. `pop_first()` needs only one re-insertion per call,
// which is an acceptable trade-off compared to multiple remove operations.
while let Some((exp, key)) = self.expiries.pop_first() {
if exp > now {
self.expiries.insert(exp, key);
Comment thread
shamardy marked this conversation as resolved.
break;
}
self.map.remove(&key);
}
}
}

#[cfg(any(test, target_arch = "wasm32"))]
Expand All @@ -94,8 +132,8 @@ mod tests {
let exp = Duration::from_secs(1);

// Insert 2 entries with 1 sec expiration time
expirable_map.insert("key1".to_string(), value.to_string(), exp);
expirable_map.insert("key2".to_string(), value.to_string(), exp);
expirable_map.insert("key1", value, exp);
expirable_map.insert("key2", value, exp);

// Wait for entries to expire
Timer::sleep(2.).await;
Expand All @@ -104,14 +142,14 @@ mod tests {
expirable_map.clear_expired_entries();

// We waited for 2 seconds, so we shouldn't have any entry accessible
assert_eq!(expirable_map.0.len(), 0);
assert_eq!(expirable_map.map.len(), 0);

// Insert 5 entries
expirable_map.insert("key1".to_string(), value.to_string(), Duration::from_secs(5));
expirable_map.insert("key2".to_string(), value.to_string(), Duration::from_secs(4));
expirable_map.insert("key3".to_string(), value.to_string(), Duration::from_secs(7));
expirable_map.insert("key4".to_string(), value.to_string(), Duration::from_secs(2));
expirable_map.insert("key5".to_string(), value.to_string(), Duration::from_millis(3750));
expirable_map.insert("key1", value, Duration::from_secs(5));
expirable_map.insert("key2", value, Duration::from_secs(4));
expirable_map.insert("key3", value, Duration::from_secs(7));
expirable_map.insert("key4", value, Duration::from_secs(2));
expirable_map.insert("key5", value, Duration::from_millis(3750));

// Wait 2 seconds to expire some entries
Timer::sleep(2.).await;
Expand All @@ -120,6 +158,6 @@ mod tests {
expirable_map.clear_expired_entries();

// We waited for 2 seconds, only one entry should expire
assert_eq!(expirable_map.0.len(), 4);
assert_eq!(expirable_map.map.len(), 4);
});
}
1 change: 1 addition & 0 deletions mm2src/mm2_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ lazy_static = "1.4"
mm2_err_handle = { path = "../mm2_err_handle" }
mm2_event_stream = { path = "../mm2_event_stream" }
mm2_metrics = { path = "../mm2_metrics" }
mm2_libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
primitives = { path = "../mm2_bitcoin/primitives" }
rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] }
serde = "1"
Expand Down
2 changes: 0 additions & 2 deletions mm2src/mm2_core/src/data_asker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ pub async fn send_asked_data_rpc(
asked_data: SendAskedDataRequest,
) -> Result<bool, MmError<SendAskedDataError>> {
let mut awaiting_asks = ctx.data_asker.awaiting_asks.lock().await;
awaiting_asks.clear_expired_entries();

match awaiting_asks.remove(&asked_data.data_id) {
Comment thread
mariocynicys marked this conversation as resolved.
Some(sender) => {
sender.send(asked_data.data).map_to_mm(|_| {
Expand Down
3 changes: 2 additions & 1 deletion mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
use lazy_static::lazy_static;
use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration};
use mm2_libp2p::PeerAddress;
use mm2_metrics::{MetricsArc, MetricsOps};
use primitives::hash::H160;
use rand::Rng;
Expand Down Expand Up @@ -145,7 +146,7 @@ pub struct MmCtx {
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<PeerAddress, oneshot::Sender<()>>>,
}

impl MmCtx {
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mm2_err_handle = { path = "../mm2_err_handle" }
mm2_event_stream = { path = "../mm2_event_stream" }
mm2_gui_storage = { path = "../mm2_gui_storage" }
mm2_io = { path = "../mm2_io" }
mm2-libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
mm2_libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
mm2_metrics = { path = "../mm2_metrics" }
mm2_net = { path = "../mm2_net", features = ["event-stream", "p2p"] }
mm2_number = { path = "../mm2_number" }
Expand Down
74 changes: 4 additions & 70 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ use instant::{Duration, Instant};
use lazy_static::lazy_static;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmError;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, TopicPrefix};
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix};
use mm2_net::p2p::P2PContext;
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::str::FromStr;
use std::sync::Mutex;

use crate::lp_network::broadcast_p2p_msg;
Expand All @@ -37,71 +36,6 @@ pub(crate) struct HealthcheckMessage {
data: HealthcheckData,
}

/// Wrapper of `libp2p::PeerId` with trait additional implementations.
///
/// TODO: This should be used as a replacement of `libp2p::PeerId` in the entire project.
#[derive(Clone, Copy, Debug, Display, PartialEq)]
pub struct PeerAddress(mm2_libp2p::PeerId);

impl From<mm2_libp2p::PeerId> for PeerAddress {
fn from(value: mm2_libp2p::PeerId) -> Self { Self(value) }
}

impl From<PeerAddress> for mm2_libp2p::PeerId {
fn from(value: PeerAddress) -> Self { value.0 }
}

impl Serialize for PeerAddress {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.0.to_string())
}
}

impl<'de> Deserialize<'de> for PeerAddress {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct PeerAddressVisitor;

impl<'de> serde::de::Visitor<'de> for PeerAddressVisitor {
type Value = PeerAddress;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string representation of peer id.")
}

fn visit_str<E>(self, value: &str) -> Result<PeerAddress, E>
where
E: serde::de::Error,
{
if value.len() > 100 {
return Err(serde::de::Error::invalid_length(
value.len(),
&"peer id cannot exceed 100 characters.",
));
}

Ok(mm2_libp2p::PeerId::from_str(value)
.map_err(serde::de::Error::custom)?
.into())
}

fn visit_string<E>(self, value: String) -> Result<PeerAddress, E>
where
E: serde::de::Error,
{
self.visit_str(&value)
}
}

deserializer.deserialize_str(PeerAddressVisitor)
}
}

#[derive(Debug, Display)]
enum SignValidationError {
#[display(
Expand Down Expand Up @@ -331,8 +265,7 @@ pub async fn peer_connection_healthcheck_rpc(

{
let mut book = ctx.healthcheck_response_handler.lock().await;
book.clear_expired_entries();
book.insert(target_peer_address.to_string(), tx, address_record_exp);
book.insert(target_peer_address, tx, address_record_exp);
}

broadcast_p2p_msg(
Expand Down Expand Up @@ -395,7 +328,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx.healthcheck_response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer.to_string()) {
if let Some(tx) = response_handler.remove(&sender_peer) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
};
Expand All @@ -409,6 +342,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
#[cfg(any(test, target_arch = "wasm32"))]
mod tests {
use std::mem::discriminant;
use std::str::FromStr;

use super::*;
use common::cross_test;
Expand Down
Loading