diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index af01024734..ffe68bad58 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -123,7 +123,7 @@ jobs: - name: Test run: | wget -O - https://raw.githubusercontent.com/KomodoPlatform/komodo/635112d590618165a152dfa0f31e95a9be39a8f6/zcutil/fetch-params-alt.sh | bash - cargo test --test 'mm2_tests_main' --no-fail-fast + cargo test --test 'mm2_tests_main' --features for-tests --no-fail-fast mac-x86-64-kdf-integration: timeout-minutes: 90 @@ -155,7 +155,7 @@ jobs: - name: Test run: | wget -O - https://raw.githubusercontent.com/KomodoPlatform/komodo/635112d590618165a152dfa0f31e95a9be39a8f6/zcutil/fetch-params-alt.sh | bash - cargo test --test 'mm2_tests_main' --no-fail-fast + cargo test --test 'mm2_tests_main' --features for-tests --no-fail-fast win-x86-64-kdf-integration: timeout-minutes: 90 @@ -191,7 +191,7 @@ jobs: - name: Test run: | Invoke-WebRequest -Uri https://raw.githubusercontent.com/KomodoPlatform/komodo/635112d590618165a152dfa0f31e95a9be39a8f6/zcutil/fetch-params-alt.bat -OutFile \cmd.bat && \cmd.bat - cargo test --test 'mm2_tests_main' --no-fail-fast + cargo test --test 'mm2_tests_main' --features for-tests --no-fail-fast docker-tests: timeout-minutes: 90 diff --git a/mm2src/mm2_main/Cargo.toml b/mm2src/mm2_main/Cargo.toml index fb15d8b373..fcb668ee01 100644 --- a/mm2src/mm2_main/Cargo.toml +++ b/mm2src/mm2_main/Cargo.toml @@ -17,7 +17,7 @@ custom-swap-locktime = [] # only for testing purposes, should never be activated native = [] # Deprecated track-ctx-pointer = ["common/track-ctx-pointer"] zhtlc-native-tests = ["coins/zhtlc-native-tests"] -run-docker-tests = ["coins/run-docker-tests"] +run-docker-tests = ["for-tests", "coins/run-docker-tests"] default = [] trezor-udp = ["crypto/trezor-udp"] # use for tests to connect to trezor emulator over udp run-device-tests = [] @@ -29,7 +29,12 @@ new-db-arch = ["mm2_core/new-db-arch"] # A temporary feature to integrate the ne # Temporary feature for implementing IBC wrap/unwrap mechanism and will be removed # once we consider it as stable. ibc-routing-for-swaps = [] -for-tests = [] +for-tests = [ + "coins/for-tests", + "coins_activation/for-tests", + "common/for-tests", + "trading_api/for-tests" +] [dependencies] async-std = { workspace = true, features = ["unstable"] } @@ -156,3 +161,13 @@ gstuff.workspace = true prost-build = { version = "0.12", default-features = false } regex.workspace = true +[[test]] +name = "docker_tests_main" +path = "tests/docker_tests_main.rs" +required-features = ["run-docker-tests"] + +[[test]] +name = "mm2_tests_main" +path = "tests/mm2_tests_main.rs" +required-features = ["for-tests"] + diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 90cb215881..a08814e647 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -134,9 +134,9 @@ pub mod ordermatch_tests; mod ordermatch_wasm_db; pub const ORDERBOOK_PREFIX: TopicPrefix = "orbk"; -#[cfg(not(test))] +#[cfg(not(any(test, feature = "for-tests")))] pub const MIN_ORDER_KEEP_ALIVE_INTERVAL: u64 = 30; -#[cfg(test)] +#[cfg(any(test, feature = "for-tests"))] pub const MIN_ORDER_KEEP_ALIVE_INTERVAL: u64 = 5; const BALANCE_REQUEST_INTERVAL: f64 = 30.; const MAKER_ORDER_TIMEOUT: u64 = MIN_ORDER_KEEP_ALIVE_INTERVAL * 3; @@ -158,7 +158,7 @@ const SWAP_VERSION_DEFAULT: u8 = 2; pub type OrderbookP2PHandlerResult = Result<(), MmError>; -#[derive(Display)] +#[derive(Debug, Display)] pub enum OrderbookP2PHandlerError { #[display(fmt = "'{_0}' is an invalid topic for the orderbook handler.")] InvalidTopic(String), @@ -176,11 +176,32 @@ pub enum OrderbookP2PHandlerError { OrderNotFound(Uuid), Internal(String), + + #[display( + fmt = "Received stale keep alive from pubkey '{from_pubkey}' propagated from '{propagated_from}', will ignore it" + )] + StaleKeepAlive { + from_pubkey: String, + propagated_from: String, + }, + + #[display( + fmt = "Sync failure for pubkey '{from_pubkey}' via '{propagated_from}'; unresolved pairs: {unresolved_pairs:?}" + )] + SyncFailure { + from_pubkey: String, + propagated_from: String, + unresolved_pairs: Vec, + }, } impl OrderbookP2PHandlerError { pub(crate) fn is_warning(&self) -> bool { - matches!(self, OrderbookP2PHandlerError::OrderNotFound(_)) + // treat SyncFailure as a warning for now due to outdated nodes + matches!( + self, + OrderbookP2PHandlerError::OrderNotFound(_) | OrderbookP2PHandlerError::SyncFailure { .. } + ) } } @@ -310,50 +331,162 @@ fn process_trie_delta( new_root } +fn build_pubkey_state_sync_request( + pubkey: &str, + pending_pairs: &HashSet, + expected_pair_roots: &HashMap, +) -> OrdermatchRequest { + let trie_roots = pending_pairs + .iter() + .filter_map(|p| expected_pair_roots.get(p).map(|&root| (p.clone(), root))) + .collect(); + OrdermatchRequest::SyncPubkeyOrderbookState { + pubkey: pubkey.to_owned(), + trie_roots, + } +} + +fn apply_pair_orders_diff( + orderbook: &mut Orderbook, + from_pubkey: &str, + pair: &AlbOrderedOrderbookPair, + diff: DeltaOrFullTrie, + protocol_infos: &HashMap, + conf_infos: &HashMap, +) -> H64 { + let params = ProcessTrieParams { + pubkey: from_pubkey, + alb_pair: pair, + protocol_infos, + conf_infos, + }; + match diff { + DeltaOrFullTrie::Delta(delta) => process_trie_delta(orderbook, delta, params), + DeltaOrFullTrie::FullTrie(values) => process_pubkey_full_trie(orderbook, values, params), + } +} + +fn apply_and_validate_pubkey_state_sync_response( + orderbook_mutex: &PaMutex, + from_pubkey: &str, + peer: &str, + response: SyncPubkeyOrderbookStateRes, + expected_pair_roots: &HashMap, + pending_pairs: &mut HashSet, + keep_alive_timestamp: u64, +) { + let mut orderbook = orderbook_mutex.lock(); + for (pair, diff) in response.pair_orders_diff { + // Ignore unsolicited pairs we didn't request to prevent state poisoning. + if !pending_pairs.contains(&pair) { + continue; + } + + let new_root = apply_pair_orders_diff( + &mut orderbook, + from_pubkey, + &pair, + diff, + &response.protocol_infos, + &response.conf_infos, + ); + + if let Some(expected) = expected_pair_roots.get(&pair) { + if &new_root == expected { + pending_pairs.remove(&pair); + // Mark per-pair maker-published timestamp once accepted + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, from_pubkey); + state + .latest_root_timestamp_by_pair + .insert(pair.clone(), keep_alive_timestamp); + state.pair_last_seen_local.insert(pair.clone(), now_sec()); + } else { + warn!( + "Sync validation failed for pubkey {} pair {} from {}: expected {:?}, got {:?}. Reverting pair.", + from_pubkey, pair, peer, expected, new_root + ); + remove_pubkey_pair_orders(&mut orderbook, from_pubkey, &pair); + } + } + } +} + +async fn request_and_apply_pubkey_state_sync_from_peer( + ctx: &MmArc, + orderbook: &PaMutex, + from_pubkey: &str, + peer: &str, + expected_pair_roots: &HashMap, + pending_pairs: &mut HashSet, + keep_alive_timestamp: u64, +) -> OrderbookP2PHandlerResult { + let current_req = build_pubkey_state_sync_request(from_pubkey, pending_pairs, expected_pair_roots); + + if let Some(resp) = request_one_peer::( + ctx.clone(), + P2PRequest::Ordermatch(current_req), + peer.to_string(), + ) + .await + .map_mm_err()? + { + apply_and_validate_pubkey_state_sync_response( + orderbook, + from_pubkey, + peer, + resp, + expected_pair_roots, + pending_pairs, + keep_alive_timestamp, + ); + } + + Ok(()) +} + async fn process_orders_keep_alive( ctx: MmArc, - propagated_from_peer: String, + propagated_from: String, from_pubkey: String, keep_alive: new_protocol::PubkeyKeepAlive, i_am_relay: bool, ) -> OrderbookP2PHandlerResult { + let keep_alive_timestamp = keep_alive.timestamp; let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("from_ctx failed"); - let to_request = ordermatch_ctx - .orderbook - .lock() - .process_keep_alive(&from_pubkey, keep_alive, i_am_relay); - - let req = match to_request { - Some(req) => req, - // The message was processed, simply forward it - None => return Ok(()), - }; - let response = request_one_peer::( - ctx.clone(), - P2PRequest::Ordermatch(req), - propagated_from_peer.clone(), + // Update local state and decide whether to sync. + let trie_roots_to_request = + ordermatch_ctx + .orderbook + .lock() + .process_keep_alive(&from_pubkey, keep_alive, i_am_relay, &propagated_from)?; + + if trie_roots_to_request.is_empty() { + // The message was processed, return Ok to forward it + return Ok(()); + } + + // Query ONLY the keepalive propagator. + let mut remaining_pairs: HashSet = trie_roots_to_request.keys().cloned().collect(); + let _ = request_and_apply_pubkey_state_sync_from_peer( + &ctx, + &ordermatch_ctx.orderbook, + &from_pubkey, + &propagated_from, + &trie_roots_to_request, + &mut remaining_pairs, + keep_alive_timestamp, ) - .await - .map_mm_err()? - .ok_or_else(|| { - MmError::new(OrderbookP2PHandlerError::P2PRequestError(format!( - "No response was received from peer {propagated_from_peer} for SyncPubkeyOrderbookState request!" - ))) - })?; - - let mut orderbook = ordermatch_ctx.orderbook.lock(); - for (pair, diff) in response.pair_orders_diff { - let params = ProcessTrieParams { - pubkey: &from_pubkey, - alb_pair: &pair, - protocol_infos: &response.protocol_infos, - conf_infos: &response.conf_infos, - }; - let _new_root = match diff { - DeltaOrFullTrie::Delta(delta) => process_trie_delta(&mut orderbook, delta, params), - DeltaOrFullTrie::FullTrie(values) => process_pubkey_full_trie(&mut orderbook, values, params), - }; + .await; + + // Phase 4: Finalize; if unresolved, mark unsynced and DO NOT FORWARD. + if !remaining_pairs.is_empty() { + let unresolved_pairs = remaining_pairs.into_iter().collect::>(); + return MmError::err(OrderbookP2PHandlerError::SyncFailure { + from_pubkey, + propagated_from, + unresolved_pairs, + }); } Ok(()) @@ -395,28 +528,38 @@ fn process_maker_order_cancelled(ctx: &MmArc, from_pubkey: String, cancelled_msg orderbook .recently_cancelled .insert_expirable(uuid, from_pubkey.clone(), RECENTLY_CANCELLED_TIMEOUT); - if let Some(order) = orderbook.order_set.get(&uuid) { - if order.pubkey == from_pubkey { - orderbook.remove_order_trie_update(uuid); + + let maybe_pair = orderbook + .order_set + .get(&uuid) + .filter(|o| o.pubkey == from_pubkey) + .map(|o| alb_ordered_pair(&o.base, &o.rel)); + + if let Some(alb_pair) = maybe_pair { + orderbook.remove_order_trie_update(uuid); + + // Advance the per-pair maker timestamp floor to the cancel timestamp + // so any earlier keep-alive for this pair will be rejected as stale. + // + // Note: we do NOT refresh liveness (pair_last_seen_local) here. + // Liveness is refreshed only when we accept state-carrying messages + // (e.g., a keep-alive that matches the expected root, a validated sync, + // a MakerOrderCreated/Updated, or a full-trie fill), not on cancel. + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, &from_pubkey); + state + .latest_root_timestamp_by_pair + .insert(alb_pair.clone(), cancelled_msg.timestamp); + + // If this cancel emptied the pair (root is zero/hashed-null), drop liveness for this pair. + // This mirrors the zero-root keep-alive path and allows faster GC of empty pairs. + if let Some(&pair_root) = state.trie_roots.get(&alb_pair) { + if pair_root == H64::default() || pair_root == hashed_null_node::() { + state.pair_last_seen_local.remove(&alb_pair); + } } } } -// fn verify_pubkey_orderbook(orderbook: &GetOrderbookPubkeyItem) -> Result<(), String> { -// let keys: Vec<(_, _)> = orderbook -// .orders -// .iter() -// .map(|(uuid, order)| { -// let order_bytes = rmp_serde::to_vec(&order).expect("Serialization should never fail"); -// (uuid.as_bytes(), Some(order_bytes)) -// }) -// .collect(); -// let (orders_root, proof) = &orderbook.pair_orders_trie_root; -// verify_trie_proof::(orders_root, proof, &keys) -// .map_err(|e| ERRL!("Error on pair_orders_trie_root verification: {}", e))?; -// Ok(()) -// } - // Some coins, for example ZHTLC, have privacy features like random keypair to sign P2P messages per every order. // So, each order of such coin has unique «pubkey» field that doesn’t match node persistent pubkey derived from passphrase. // We can compare pubkeys from maker_orders and from asks or bids, to find our order. @@ -496,6 +639,9 @@ async fn request_and_fill_orderbook(ctx: &MmArc, base: &str, rel: &str) -> Resul conf_infos: &conf_infos, }; let _new_root = process_pubkey_full_trie(&mut orderbook, orders, params); + + let state = pubkey_state_mut(&mut orderbook.pubkeys_state, &pubkey); + state.pair_last_seen_local.insert(alb_pair.clone(), now_sec()); } let topic = orderbook_topic_from_base_rel(base, rel); @@ -808,8 +954,10 @@ fn process_get_orderbook_request(ctx: MmArc, base: String, rel: String) -> Resul pubkey ))?; + let pubkey_last_seen = pubkey_state.pair_last_seen_local.values().max().copied().unwrap_or(0); + let item = GetOrderbookPubkeyItem { - last_keep_alive: pubkey_state.last_keep_alive, + last_keep_alive: pubkey_last_seen, orders, // TODO save last signed payload to pubkey state last_signed_pubkey_payload: vec![], @@ -921,10 +1069,9 @@ impl DeltaOrFull return Ok(DeltaOrFullTrie::Delta(total_delta)); } - log::warn!( + warn!( "History started from {:?} ends with not up-to-date trie root {:?}", - from_hash, - actual_trie_root + from_hash, actual_trie_root ); } @@ -1162,7 +1309,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler { None => return, }; if coin.wallet_only(&ctx) { - log::warn!( + warn!( "coin: {} is wallet only, skip BalanceTradeFeeUpdatedHandler", coin.ticker() ); @@ -1174,7 +1321,7 @@ impl BalanceTradeFeeUpdatedHandler for BalanceUpdateOrdermatchHandler { Ok(vol_info) => vol_info.volume, Err(e) if e.get_inner().not_sufficient_balance() => MmNumber::from(0), Err(e) => { - log::warn!("Couldn't handle the 'balance_updated' event: {}", e); + warn!("Couldn't handle the 'balance_updated' event: {}", e); return; }, }; @@ -2444,10 +2591,6 @@ fn broadcast_keep_alive_for_pub(ctx: &MmArc, pubkey: &str, orderbook: &Orderbook }; for (alb_pair, root) in state.trie_roots.iter() { - if *root == H64::default() && *root == hashed_null_node::() { - continue; - } - let message = new_protocol::PubkeyKeepAlive { trie_roots: HashMap::from([(alb_pair.clone(), *root)]), timestamp: now_sec(), @@ -2582,12 +2725,20 @@ impl TrieDiffHistory { type TrieOrderHistory = TrieDiffHistory; struct OrderbookPubkeyState { - /// Local receive time (seconds) when we last accepted a keep-alive from this pubkey. - /// Used by inactivity GC to purge stale pubkeys and their orders. - last_keep_alive: u64, - /// Monotonic maker-published timestamp of the last processed PubkeyKeepAlive. - /// Used to ignore out-of-order or replayed keep-alive messages from this pubkey. - latest_maker_timestamp: u64, + /// Monotonic maker-published timestamp for the last accepted root per pair. + /// Used to ignore out-of-order or replayed keep-alive roots for specific pairs. + /// + /// Retention policy: + /// We intentionally retain entries even after a pair is pruned by inactivity GC to defend against + /// stale replays resurrecting old state. These entries are dropped only when the entire pubkey + /// state is removed. + latest_root_timestamp_by_pair: HashMap, + /// Local receive time (seconds) of the last accepted keep‑alive per alphabetically ordered pair + /// owned by this pubkey. This is the authoritative liveness signal used by inactivity GC and + /// trie‑state pruning. Pairs not updated within the timeout are purged; if all pairs are stale, + /// the entire pubkey state is removed. + /// Key: `AlbOrderedOrderbookPair` ("BASE:REL"), Value: local Unix time in seconds. + pair_last_seen_local: HashMap, /// The map storing historical data about specific pair subtrie changes /// Used to get diffs of orders of pair between specific root hashes order_pairs_trie_state_history: TimedMap, @@ -2600,10 +2751,8 @@ struct OrderbookPubkeyState { impl OrderbookPubkeyState { pub fn new() -> OrderbookPubkeyState { OrderbookPubkeyState { - // Keep `last_keep_alive` based on local receive time. This is used for cleaning up orders of an inactive pubkey. - last_keep_alive: now_sec(), - // Start at 0 so the first message from this pubkey always passes the monotonic check. - latest_maker_timestamp: 0, + latest_root_timestamp_by_pair: HashMap::default(), + pair_last_seen_local: HashMap::default(), order_pairs_trie_state_history: TimedMap::new_with_map_kind(MapKind::FxHashMap), orders_uuids: HashSet::default(), trie_roots: HashMap::default(), @@ -2739,6 +2888,8 @@ impl Orderbook { let pair_root = order_pair_root_mut(&mut pubkey_state.trie_roots, &alb_ordered); let prev_root = *pair_root; + pubkey_state.pair_last_seen_local.insert(alb_ordered.clone(), now_sec()); + pubkey_state.orders_uuids.insert((order.uuid, alb_ordered.clone())); { @@ -2917,25 +3068,84 @@ impl Orderbook { self.topics_subscribed_to.contains_key(topic) } + /// Processes a [`new_protocol::PubkeyKeepAlive`] for `from_pubkey`, updating per‑pair state and + /// deciding whether a state sync is required. + /// + /// This function performs only local state transitions; it does no I/O. + /// It may: + /// - Accept the announcement (per pair) and refresh timestamps if the announced root matches the + /// local root. + /// - Clear a pair if the announced root is null/empty (remove all orders, remember the null root, + /// update the maker timestamp floor, and drop the local “last seen” so GC can prune it). + /// - Detect divergence and return the set of pairs that must be synced to the announced roots. + /// + /// Invariants and validation: + /// - Maker timestamps are checked per pair and must be strictly increasing. If any pair is stale + /// (announced timestamp ≤ the last accepted maker timestamp for that pair), the function returns + /// [`OrderbookP2PHandlerError::StaleKeepAlive`] and no state is advanced for that message. Callers + /// must not propagate such a message. + /// - We update `pair_last_seen_local` only for pairs that are accepted/in‑sync. Divergent pairs do + /// not refresh liveness until validated to the expected root. + /// + /// Subscription policy: + /// - Pairs we are not subscribed to are ignored unless this node acts as a relay (`i_am_relay`). + /// + /// Returns: + /// - A map of `"BASE:REL"` to the expected roots (as announced) for which local state diverges and + /// a sync is required. An empty map means no sync is needed and the message may be propagated. + /// + /// Caller responsibilities: + /// - If the returned map is non‑empty, fetch trie diffs from the peer that propagated the keep‑alive + /// (`propagated_from`), validate them to the expected roots, apply, and only then consider forwarding + /// the keep‑alive. + /// + /// Errors: + /// - [`OrderbookP2PHandlerError::StaleKeepAlive`] if any per‑pair maker timestamp regresses or repeats. + /// - Other variants for malformed or otherwise invalid data. + /// + /// Notes: + /// - Maker timestamps are used for monotonicity/anti‑replay; GC uses local receive times. + /// - The caller must ignore Unsolicited pairs in sync responses when applying the response. fn process_keep_alive( &mut self, from_pubkey: &str, message: new_protocol::PubkeyKeepAlive, i_am_relay: bool, - ) -> Option { + propagated_from: &str, + ) -> Result, MmError> { + // TODO(rate-limit): + // Add a single, shared in‑process rate limiter for OrdermatchMessage + // types (e.g., a module in mm2_p2p or lp_network). + // For keep-alive, enforce a minimum interval per pubkey (>= 20–30s) or X messages/minute. + // When throttled, early‑return without syncing/propagating, before any state mutation. + + // Pre-scan: if any single pair is stale => treat the whole message as stale. + // Note: We currently send keep-alives per pair (trie_roots has a single entry), + // so this is equivalent to checking that one pair. If multi-pair keep-alives return, + // demote only stale pairs instead of rejecting the whole message. { let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); - if message.timestamp <= pubkey_state.latest_maker_timestamp { - log::debug!( - "Ignoring PubkeyKeepAlive from {}: message.timestamp={} <= last_processed_timestamp={} (stale/replayed)", - from_pubkey, - message.timestamp, - pubkey_state.latest_maker_timestamp - ); - return None; + for (alb_pair, _) in message.trie_roots.iter() { + let subscribed = self + .topics_subscribed_to + .contains_key(&orderbook_topic_from_ordered_pair(alb_pair)); + if !subscribed && !i_am_relay { + continue; + } + + let last_pair_timestamp = *pubkey_state.latest_root_timestamp_by_pair.get(alb_pair).unwrap_or(&0); + + if message.timestamp <= last_pair_timestamp { + log::debug!( + "Ignoring a stale PubkeyKeepAlive from {} for pair {}: message.timestamp={} <= last_pair_timestamp={}", + from_pubkey, alb_pair, message.timestamp, last_pair_timestamp + ); + return MmError::err(OrderbookP2PHandlerError::StaleKeepAlive { + from_pubkey: from_pubkey.to_owned(), + propagated_from: propagated_from.to_owned(), + }); + } } - pubkey_state.latest_maker_timestamp = message.timestamp; - pubkey_state.last_keep_alive = now_sec(); } let mut trie_roots_to_request = HashMap::new(); @@ -2964,6 +3174,10 @@ impl Orderbook { { let pubkey_state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); pubkey_state.trie_roots.insert(alb_pair.clone(), trie_root); + pubkey_state + .latest_root_timestamp_by_pair + .insert(alb_pair.clone(), message.timestamp); + pubkey_state.pair_last_seen_local.remove(&alb_pair); } continue; @@ -2976,17 +3190,16 @@ impl Orderbook { }; if current_root != trie_root { trie_roots_to_request.insert(alb_pair, trie_root); + continue; } + let state = pubkey_state_mut(&mut self.pubkeys_state, from_pubkey); + state + .latest_root_timestamp_by_pair + .insert(alb_pair.clone(), message.timestamp); + state.pair_last_seen_local.insert(alb_pair, now_sec()); } - if trie_roots_to_request.is_empty() { - return None; - } - - Some(OrdermatchRequest::SyncPubkeyOrderbookState { - pubkey: from_pubkey.to_owned(), - trie_roots: trie_roots_to_request, - }) + Ok(trie_roots_to_request) } fn orderbook_item_with_proof(&self, order: OrderbookItem) -> OrderbookItemWithProof { @@ -3714,7 +3927,6 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { .expect("CryptoCtx not available") .mm2_internal_pubkey_hex(); - let maker_order_timeout = ctx.conf["maker_order_timeout"].as_u64().unwrap_or(MAKER_ORDER_TIMEOUT); loop { if ctx.is_stopping() { break; @@ -3739,26 +3951,78 @@ pub async fn lp_ordermatch_loop(ctx: MmArc) { } { - // remove "timed out" pubkeys states with their orders from orderbook let mut orderbook = ordermatch_ctx.orderbook.lock(); - let mut uuids_to_remove = vec![]; - let mut pubkeys_to_remove = vec![]; + + let deadline = now_sec().saturating_sub(MAKER_ORDER_TIMEOUT); + + let mut pairs_to_prune = Vec::new(); + let mut pubkeys_to_remove = Vec::new(); + for (pubkey, state) in orderbook.pubkeys_state.iter() { - let is_ours = orderbook.my_p2p_pubkeys.contains(pubkey); - let to_keep = - pubkey == &my_pubsecp || is_ours || state.last_keep_alive + maker_order_timeout > now_sec(); - if !to_keep { - for (uuid, _) in &state.orders_uuids { - uuids_to_remove.push(*uuid); - } + // Skip our own pubkeys; local order lifecycle manages them. + if pubkey == &my_pubsecp || orderbook.my_p2p_pubkeys.contains(pubkey) { + continue; + } + + // Remove the pubkey if no pair has been seen recently. + let any_recent_pair = state + .trie_roots + .keys() + .any(|pair| state.pair_last_seen_local.get(pair).copied().unwrap_or(0) > deadline); + + if !any_recent_pair { pubkeys_to_remove.push(pubkey.clone()); + continue; + } + + // Otherwise keep the pubkey and prune only its stale pairs. + let stale_pairs: Vec<_> = state + .trie_roots + .keys() + .filter_map(|pair| { + let last_seen = state.pair_last_seen_local.get(pair).copied().unwrap_or(0); + (last_seen <= deadline).then(|| pair.clone()) + }) + .collect(); + + if !stale_pairs.is_empty() { + pairs_to_prune.push((pubkey.clone(), stale_pairs)); } } - for uuid in uuids_to_remove { - orderbook.remove_order_trie_update(uuid); + // Prune stale pairs for pubkeys we keep. + for (pubkey, pairs) in pairs_to_prune { + for pair in pairs { + remove_pubkey_pair_orders(&mut orderbook, &pubkey, &pair); + if let Some(state) = orderbook.pubkeys_state.get_mut(&pubkey) { + state.pair_last_seen_local.remove(&pair); + } + } } + + // Remove fully expired pubkeys: delete their remaining orders first, then the state. + // GC: remove orders while the pubkey state still exists to avoid lazy re-creation inside `remove_order_trie_update`. for pubkey in pubkeys_to_remove { + if let Some(orders) = orderbook + .pubkeys_state + .get_mut(&pubkey) + .map(|state| std::mem::take(&mut state.orders_uuids)) + { + for (uuid, _) in orders { + orderbook.remove_order_trie_update(uuid); + } + } + + // TODO(pubkey_replay_guard): + // Block stale replays after full pubkey timeout (incl. restarts). + // - On full pubkey removal: store pubkey -> max maker_ts (write-once HashMap). + // - On keep-alive intake: before creating state, drop if maker_ts <= stored floor. + // - Restarts: persist this map (and ideally per-pair last_maker_ts); until persisted, enforce + // sanity bounds: maker_ts > now + MAX_MAKER_FUTURE_SKEW or now - maker_ts > MAX_MAKER_PAST_AGE => reject. + // - Liveness: keep GC driven by local time; optionally prune when maker_stale || local_stale; + // do not rely solely on maker clock. + // - Cleanup: pre-scan without creating state; centralize timestamp checks/logging; never sync + // origin for messages rejected by guards; add unit tests. orderbook.pubkeys_state.remove(&pubkey); } @@ -4127,7 +4391,7 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: let order_match = match my_order_entry.get().matches.get(&connected.maker_order_uuid) { Some(o) => o, None => { - log::warn!( + warn!( "Our node doesn't have the match with uuid {}", connected.maker_order_uuid ); @@ -4288,7 +4552,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg let order_match = match my_order.matches.get_mut(&connect_msg.taker_order_uuid) { Some(o) => o, None => { - log::warn!( + warn!( "Our node doesn't have the match with uuid {}", connect_msg.taker_order_uuid ); @@ -4296,7 +4560,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg }, }; if order_match.request.sender_pubkey != sender_pubkey.unprefixed().into() { - log::warn!("Connect message sender pubkey != request message sender pubkey"); + warn!("Connect message sender pubkey != request message sender pubkey"); return; } @@ -5633,7 +5897,7 @@ pub async fn orders_history_by_filter(ctx: MmArc, req: Json) -> Result { - assert!(pairs_trie_roots.contains_key("C1:C2")); - assert!(!pairs_trie_roots.contains_key("C2:C3")); - }, - _ => panic!("Invalid request {:?}", request), - } + let pairs_trie_roots = orderbook + .process_keep_alive(pubkey, message, false, propagated_from) + .unwrap(); + assert!(pairs_trie_roots.contains_key("C1:C2")); + assert!(!pairs_trie_roots.contains_key("C2:C3")); } #[test] @@ -2651,6 +2646,7 @@ fn test_orderbook_pubkey_sync_request_relay() { OrderbookRequestingState::Requested, ); let pubkey = "pubkey"; + let propagated_from = "propagated_from"; let mut trie_roots = HashMap::new(); trie_roots.insert("C1:C2".to_owned(), [1; 8]); @@ -2661,17 +2657,11 @@ fn test_orderbook_pubkey_sync_request_relay() { timestamp: now_sec(), }; - let request = orderbook.process_keep_alive(pubkey, message, true).unwrap(); - match request { - OrdermatchRequest::SyncPubkeyOrderbookState { - trie_roots: pairs_trie_roots, - .. - } => { - assert!(pairs_trie_roots.contains_key("C1:C2")); - assert!(pairs_trie_roots.contains_key("C2:C3")); - }, - _ => panic!("Invalid request {:?}", request), - } + let pairs_trie_roots = orderbook + .process_keep_alive(pubkey, message, true, propagated_from) + .unwrap(); + assert!(pairs_trie_roots.contains_key("C1:C2")); + assert!(pairs_trie_roots.contains_key("C2:C3")); } #[test] diff --git a/mm2src/mm2_main/tests/feature_gate_for_tests.rs b/mm2src/mm2_main/tests/feature_gate_for_tests.rs new file mode 100644 index 0000000000..620cd7e76a --- /dev/null +++ b/mm2src/mm2_main/tests/feature_gate_for_tests.rs @@ -0,0 +1,8 @@ +#![cfg(not(target_arch = "wasm32"))] + +#[cfg(not(any(feature = "for-tests", feature = "run-docker-tests")))] +compile_error!(concat!( + "Integration tests for `", + env!("CARGO_PKG_NAME"), + "` require either the `for-tests` or the `run-docker-tests` feature." +)); diff --git a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs index 71c55fec0f..9f841e7359 100644 --- a/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs +++ b/mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs @@ -1823,7 +1823,6 @@ fn test_order_should_not_be_displayed_when_node_is_down() { "coins": coins, "seednodes": [mm_bob.ip.to_string()], "rpc_password": "pass", - "maker_order_timeout": 5, }), "pass".into(), None, @@ -1870,7 +1869,7 @@ fn test_order_should_not_be_displayed_when_node_is_down() { assert_eq!(asks.len(), 1, "Alice RICK/MORTY orderbook must have exactly 1 ask"); block_on(mm_bob.stop()).unwrap(); - thread::sleep(Duration::from_secs(6)); + thread::sleep(Duration::from_secs(16)); let rc = block_on(mm_alice.rpc(&json! ({ "userpass": mm_alice.userpass, @@ -1910,7 +1909,6 @@ fn test_own_orders_should_not_be_removed_from_orderbook() { "coins": coins, "i_am_seed": true, "rpc_password": "pass", - "maker_order_timeout": 5, "is_bootstrap_node": true }), "pass".into(), @@ -1939,7 +1937,7 @@ fn test_own_orders_should_not_be_removed_from_orderbook() { .unwrap(); assert!(rc.0.is_success(), "!setprice: {}", rc.1); - thread::sleep(Duration::from_secs(6)); + thread::sleep(Duration::from_secs(16)); let rc = block_on(mm_bob.rpc(&json! ({ "userpass": mm_bob.userpass, @@ -2121,7 +2119,7 @@ fn set_price_with_cancel_previous_should_broadcast_cancelled_message() { ]); // start bob and immediately place the order - let mm_bob = MarketMakerIt::start( + let mut mm_bob = MarketMakerIt::start( json! ({ "gui": "nogui", "netid": 9998, @@ -2159,7 +2157,7 @@ fn set_price_with_cancel_previous_should_broadcast_cancelled_message() { let rc = block_on(mm_bob.rpc(&set_price_json)).unwrap(); assert!(rc.0.is_success(), "!setprice: {}", rc.1); - let mm_alice = MarketMakerIt::start( + let mut mm_alice = MarketMakerIt::start( json! ({ "gui": "nogui", "netid": 9998, @@ -2204,9 +2202,13 @@ fn set_price_with_cancel_previous_should_broadcast_cancelled_message() { let rc = block_on(mm_bob.rpc(&set_price_json)).unwrap(); assert!(rc.0.is_success(), "!setprice: {}", rc.1); - let pause = 2; - log!("Waiting ({} seconds) for Bob to broadcast messages…", pause); - thread::sleep(Duration::from_secs(pause)); + block_on(mm_bob.wait_for_log(10., |log| log.contains("maker_order_cancelled_p2p_notify called"))) + .expect("Cancel broadcast not seen in Bob's logs within the expected time"); + + block_on(mm_alice.wait_for_log(10., |log| { + log.contains("received ordermatch message MakerOrderCancelled") + })) + .expect("Alice did not log MakerOrderCancelled in time"); // Bob orderbook must show 1 order log!("Get RICK/MORTY orderbook on Bob side"); diff --git a/mm2src/mm2_main/tests/mm2_tests_main.rs b/mm2src/mm2_main/tests/mm2_tests_main.rs index 83fdf13071..f38a62a8f3 100644 --- a/mm2src/mm2_main/tests/mm2_tests_main.rs +++ b/mm2src/mm2_main/tests/mm2_tests_main.rs @@ -1,4 +1,4 @@ -#![cfg(not(target_arch = "wasm32"))] +#![cfg(all(not(target_arch = "wasm32"), feature = "for-tests"))] mod integration_tests_common; mod mm2_tests;