Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/tendermint_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use cosmrs::tendermint::abci::{Code as TxCode, EventAttribute};
use cosmrs::tx::Fee;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmResult;
use mm2_event_stream::StreamingManager;
use mm2_event_stream::{DeriveStreamerId, StreamingManager};
use mm2_number::BigDecimal;
use mm2_state_machine::prelude::*;
use mm2_state_machine::state_machine::StateMachineTrait;
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/utxo/rpc_clients/electrum_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures01::Future;
use itertools::Itertools;
use mm2_event_stream::{StreamingManager, StreamingManagerError};
use mm2_event_stream::{DeriveStreamerId, StreamingManager, StreamingManagerError};
use serde_json::{self as json, Value as Json};

type ElectrumTxHistory = Vec<ElectrumTxHistoryItem>;
Expand Down
12 changes: 7 additions & 5 deletions mm2src/coins/utxo/tx_history_events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::TransactionDetails;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId};

use async_trait::async_trait;
use futures::channel::oneshot;
Expand All @@ -9,12 +9,14 @@ pub struct TxHistoryEventStreamer {
coin: String,
}

impl TxHistoryEventStreamer {
#[inline(always)]
pub fn new(coin: String) -> Self { Self { coin } }
impl<'a> DeriveStreamerId<'a> for TxHistoryEventStreamer {
type InitParam = String;
type DeriveParam = &'a str;

fn new(coin: Self::InitParam) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } }
fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } }
}

#[async_trait]
Expand Down
11 changes: 7 additions & 4 deletions mm2src/coins/utxo/utxo_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common::log;
use futures::channel::oneshot;
use futures::StreamExt;
use keys::Address;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId};
use std::collections::{HashMap, HashSet};

macro_rules! try_or_continue {
Expand All @@ -31,16 +31,19 @@ pub struct UtxoBalanceEventStreamer {
coin: UtxoStandardCoin,
}

impl UtxoBalanceEventStreamer {
pub fn new(utxo_arc: UtxoArc) -> Self {
impl<'a> DeriveStreamerId<'a> for UtxoBalanceEventStreamer {
type InitParam = UtxoArc;
type DeriveParam = &'a str;

fn new(utxo_arc: Self::InitParam) -> Self {
Self {
// We wrap the UtxoArc in a UtxoStandardCoin for easier method accessibility.
// The UtxoArc might belong to a different coin type though.
coin: UtxoStandardCoin::from(utxo_arc),
}
}

pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } }
fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } }
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/utxo/utxo_tx_history_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use common::log::{error, info};
use derive_more::Display;
use keys::Address;
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_event_stream::{DeriveStreamerId, StreamingManager};
use mm2_metrics::MetricsArc;
use mm2_number::BigDecimal;
use mm2_state_machine::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions mm2src/coins/z_coin/storage/blockdb/blockdb_sql_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use db_common::sqlite::{query_single_row, run_optimization_pragmas, rusqlite};
use itertools::Itertools;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::DeriveStreamerId;
use protobuf::Message;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand Down
12 changes: 7 additions & 5 deletions mm2src/coins/z_coin/tx_history_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::utxo::rpc_clients::UtxoRpcError;
use crate::MarketCoinOps;
use common::log;
use mm2_err_handle::prelude::MmError;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId};
use rpc::v1::types::H256 as H256Json;

use async_trait::async_trait;
Expand All @@ -18,12 +18,14 @@ pub struct ZCoinTxHistoryEventStreamer {
coin: ZCoin,
}

impl ZCoinTxHistoryEventStreamer {
#[inline(always)]
pub fn new(coin: ZCoin) -> Self { Self { coin } }
impl<'a> DeriveStreamerId<'a> for ZCoinTxHistoryEventStreamer {
type InitParam = ZCoin;
type DeriveParam = &'a str;

fn new(coin: Self::InitParam) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } }
fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } }
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions mm2src/coins/z_coin/tx_streaming_tests/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{CoinProtocol, DexFee, MarketCoinOps, MmCoin, PrivKeyBuildPolicy};
use common::custom_futures::timeout::FutureTimerExt;
use common::{block_on, Future01CompatExt};
use mm2_core::mm_ctx::MmCtxBuilder;
use mm2_event_stream::DeriveStreamerId;
use mm2_test_helpers::for_tests::{pirate_conf, ARRR};
use std::time::Duration;

Expand Down
12 changes: 7 additions & 5 deletions mm2src/coins/z_coin/z_balance_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ use async_trait::async_trait;
use common::log::error;
use futures::channel::oneshot;
use futures_util::StreamExt;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId};

pub struct ZCoinBalanceEventStreamer {
coin: ZCoin,
}

impl ZCoinBalanceEventStreamer {
#[inline(always)]
pub fn new(coin: ZCoin) -> Self { Self { coin } }
impl<'a> DeriveStreamerId<'a> for ZCoinBalanceEventStreamer {
type InitParam = ZCoin;
type DeriveParam = &'a str;

fn new(coin: Self::InitParam) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } }
fn derive_streamer_id(coin: Self::DeriveParam) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } }
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_event_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ pub mod streamer_ids;
pub use configuration::EventStreamingConfiguration;
pub use event::Event;
pub use manager::{StreamingManager, StreamingManagerError};
pub use streamer::{Broadcaster, EventStreamer, NoDataIn, StreamHandlerInput};
pub use streamer::{Broadcaster, DeriveStreamerId, EventStreamer, NoDataIn, StreamHandlerInput};
pub use streamer_ids::StreamerId;
8 changes: 8 additions & 0 deletions mm2src/mm2_event_stream/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ where
);
}

pub trait DeriveStreamerId<'a> {
Comment thread
onur-ozkan marked this conversation as resolved.
type InitParam;
type DeriveParam: 'a;

fn new(param: Self::InitParam) -> Self;
fn derive_streamer_id(param: Self::DeriveParam) -> StreamerId;
}

/// Spawns the [`EventStreamer::handle`] in a separate task using [`WeakSpawner`].
///
/// Returns a [`oneshot::Sender`] to shutdown the handler and an optional [`mpsc::UnboundedSender`]
Expand Down
24 changes: 13 additions & 11 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use http::Response;
use keys::{AddressFormat, KeyPair};
use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_event_stream::StreamingManager;
use mm2_event_stream::{DeriveStreamerId, StreamingManager};
use mm2_libp2p::application::request_response::ordermatch::OrdermatchRequest;
use mm2_libp2p::application::request_response::P2PRequest;
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, PublicKey, TopicHash, TopicPrefix,
Expand Down Expand Up @@ -2737,9 +2737,10 @@ impl Orderbook {
.insert(order.uuid);

self.streaming_manager
.send_fn(&OrderbookStreamer::derive_streamer_id(&order.base, &order.rel), || {
OrderbookItemChangeEvent::NewOrUpdatedItem(Box::new(order.clone().into()))
})
.send_fn(
&OrderbookStreamer::derive_streamer_id((&order.base, &order.rel)),
|| OrderbookItemChangeEvent::NewOrUpdatedItem(Box::new(order.clone().into())),
)
.ok();
self.order_set.insert(order.uuid, order);
}
Expand Down Expand Up @@ -2806,9 +2807,10 @@ impl Orderbook {
}

self.streaming_manager
.send_fn(&OrderbookStreamer::derive_streamer_id(&order.base, &order.rel), || {
OrderbookItemChangeEvent::RemovedItem(order.uuid)
})
.send_fn(
&OrderbookStreamer::derive_streamer_id((&order.base, &order.rel)),
|| OrderbookItemChangeEvent::RemovedItem(order.uuid),
)
.ok();
Some(order)
}
Expand Down Expand Up @@ -3908,7 +3910,7 @@ async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg:
};

ctx.event_stream_manager
.send_fn(&OrderStatusStreamer::derive_streamer_id(), || {
.send_fn(&OrderStatusStreamer::derive_streamer_id(()), || {
OrderStatusEvent::TakerMatch(taker_match.clone())
})
.ok();
Expand Down Expand Up @@ -3963,7 +3965,7 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected:
}

ctx.event_stream_manager
.send_fn(&OrderStatusStreamer::derive_streamer_id(), || {
.send_fn(&OrderStatusStreamer::derive_streamer_id(()), || {
OrderStatusEvent::TakerConnected(order_match.clone())
})
.ok();
Expand Down Expand Up @@ -4079,7 +4081,7 @@ async fn process_taker_request(ctx: MmArc, from_pubkey: H256Json, taker_request:
};

ctx.event_stream_manager
.send_fn(&OrderStatusStreamer::derive_streamer_id(), || {
.send_fn(&OrderStatusStreamer::derive_streamer_id(()), || {
OrderStatusEvent::MakerMatch(maker_match.clone())
})
.ok();
Expand Down Expand Up @@ -4150,7 +4152,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg
let order_match = order_match.clone();

ctx.event_stream_manager
.send_fn(&OrderStatusStreamer::derive_streamer_id(), || {
.send_fn(&OrderStatusStreamer::derive_streamer_id(()), || {
OrderStatusEvent::MakerConnected(order_match.clone())
})
.ok();
Expand Down
15 changes: 8 additions & 7 deletions mm2src/mm2_main/src/lp_ordermatch/order_events.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use super::{MakerMatch, TakerMatch};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId};

use async_trait::async_trait;
use futures::channel::oneshot;
use futures::StreamExt;

pub struct OrderStatusStreamer;

impl OrderStatusStreamer {
#[inline(always)]
pub fn new() -> Self { Self }
impl DeriveStreamerId<'_> for OrderStatusStreamer {
type InitParam = ();
type DeriveParam = ();

#[inline(always)]
pub const fn derive_streamer_id() -> StreamerId { StreamerId::OrderStatus }
fn new(_: Self::InitParam) -> Self { Self }

fn derive_streamer_id(_: Self::DeriveParam) -> StreamerId { StreamerId::OrderStatus }
}

#[derive(Serialize)]
Expand All @@ -28,7 +29,7 @@ pub enum OrderStatusEvent {
impl EventStreamer for OrderStatusStreamer {
type DataInType = OrderStatusEvent;

fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id() }
fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(()) }

async fn handle(
self,
Expand Down
13 changes: 8 additions & 5 deletions mm2src/mm2_main/src/lp_ordermatch/orderbook_events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{orderbook_topic_from_base_rel, subscribe_to_orderbook_topic, OrderbookP2PItem};
use coins::{is_wallet_only_ticker, lp_coinfind};
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use mm2_event_stream::{Broadcaster, DeriveStreamerId, Event, EventStreamer, StreamHandlerInput, StreamerId};

use async_trait::async_trait;
use futures::channel::oneshot;
Expand All @@ -14,10 +14,13 @@ pub struct OrderbookStreamer {
rel: String,
}

impl OrderbookStreamer {
pub fn new(ctx: MmArc, base: String, rel: String) -> Self { Self { ctx, base, rel } }
impl<'a> DeriveStreamerId<'a> for OrderbookStreamer {
type InitParam = (MmArc, String, String);
type DeriveParam = (&'a str, &'a str);
Comment thread
onur-ozkan marked this conversation as resolved.
Outdated

pub fn derive_streamer_id(base: &str, rel: &str) -> StreamerId {
fn new((ctx, base, rel): Self::InitParam) -> Self { Self { ctx, base, rel } }

fn derive_streamer_id((base, rel): Self::DeriveParam) -> StreamerId {
StreamerId::OrderbookUpdate {
topic: orderbook_topic_from_base_rel(base, rel),
}
Expand All @@ -38,7 +41,7 @@ pub enum OrderbookItemChangeEvent {
impl EventStreamer for OrderbookStreamer {
type DataInType = OrderbookItemChangeEvent;

fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(&self.base, &self.rel) }
fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id((&self.base, &self.rel)) }

async fn handle(
self,
Expand Down
9 changes: 6 additions & 3 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use futures::{compat::Future01CompatExt, select, FutureExt};
use keys::KeyPair;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::DeriveStreamerId;
use mm2_number::{BigDecimal, MmNumber};
use mm2_rpc::data::legacy::OrderConfirmationsSettings;
use parking_lot::Mutex as PaMutex;
Expand Down Expand Up @@ -2190,9 +2191,11 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
drop(dispatcher);
// Send a notification to the swap status streamer about a new event.
ctx.event_stream_manager
.send_fn(&SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::MakerV1 {
uuid: running_swap.uuid,
event: to_save.clone(),
.send_fn(&SwapStatusStreamer::derive_streamer_id(()), || {
SwapStatusEvent::MakerV1 {
uuid: running_swap.uuid,
event: to_save.clone(),
}
})
.ok();
save_my_maker_swap_event(&ctx, &running_swap, to_save)
Expand Down
9 changes: 6 additions & 3 deletions mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crypto::secret_hash_algo::SecretHashAlgo;
use keys::KeyPair;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_event_stream::DeriveStreamerId;
use mm2_libp2p::Secp256k1PubkeySerialize;
use mm2_number::MmNumber;
use mm2_state_machine::prelude::*;
Expand Down Expand Up @@ -792,9 +793,11 @@ impl<MakerCoin: MmCoin + MakerCoinSwapOpsV2, TakerCoin: MmCoin + TakerCoinSwapOp
// Send a notification to the swap status streamer about a new event.
self.ctx
.event_stream_manager
.send_fn(&SwapStatusStreamer::derive_streamer_id(), || SwapStatusEvent::MakerV2 {
uuid: self.uuid,
event: event.clone(),
.send_fn(&SwapStatusStreamer::derive_streamer_id(()), || {
SwapStatusEvent::MakerV2 {
uuid: self.uuid,
event: event.clone(),
}
})
.ok();
}
Expand Down
Loading