Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions mm2src/coins/eth/eth_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{eth::{u256_to_big_decimal, Erc20TokenDetails},
BalanceError, CoinWithDerivationMethod};
use common::{executor::Timer, log, Future01CompatExt};
use mm2_err_handle::prelude::MmError;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId};
use mm2_number::BigDecimal;

use async_trait::async_trait;
Expand Down Expand Up @@ -146,15 +146,19 @@ async fn fetch_balance(
impl EventStreamer for EthBalanceEventStreamer {
type DataInType = NoDataIn;

fn streamer_id(&self) -> String { format!("BALANCE:{}", self.coin.ticker) }
fn streamer_id(&self) -> StreamerId {
StreamerId::Balance {
coin: self.coin.ticker.to_string(),
}
}

async fn handle(
self,
broadcaster: Broadcaster,
ready_tx: oneshot::Sender<Result<(), String>>,
_: impl StreamHandlerInput<NoDataIn>,
) {
async fn start_polling(streamer_id: String, broadcaster: Broadcaster, coin: EthCoin, interval: f64) {
async fn start_polling(streamer_id: StreamerId, broadcaster: Broadcaster, coin: EthCoin, interval: f64) {
async fn sleep_remaining_time(interval: f64, now: Instant) {
// If the interval is x seconds,
// our goal is to broadcast changed balances every x seconds.
Expand Down
8 changes: 6 additions & 2 deletions mm2src/coins/eth/fee_estimation/eth_fee_events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::ser::FeePerGasEstimated;
use crate::eth::EthCoin;
use common::executor::Timer;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId};

use async_trait::async_trait;
use compatible_time::Instant;
Expand Down Expand Up @@ -52,7 +52,11 @@ impl EthFeeEventStreamer {
impl EventStreamer for EthFeeEventStreamer {
type DataInType = NoDataIn;

fn streamer_id(&self) -> String { format!("FEE_ESTIMATION:{}", self.coin.ticker) }
fn streamer_id(&self) -> StreamerId {
StreamerId::FeeEstimation {
coin: self.coin.ticker.to_string(),
}
}

async fn handle(
self,
Expand Down
8 changes: 6 additions & 2 deletions mm2src/coins/tendermint/tendermint_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use common::{http_uri_to_ws_address, log, PROXY_REQUEST_EXPIRATION_SEC};
use futures::channel::oneshot;
use futures_util::{SinkExt, StreamExt};
use jsonrpc_core::{Id as RpcId, Params as RpcParams, Value as RpcValue, Version as RpcVersion};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, NoDataIn, StreamHandlerInput, StreamerId};
use mm2_number::BigDecimal;
use proxy_signature::RawMessage;
use std::collections::{HashMap, HashSet};
Expand All @@ -23,7 +23,11 @@ impl TendermintBalanceEventStreamer {
impl EventStreamer for TendermintBalanceEventStreamer {
type DataInType = NoDataIn;

fn streamer_id(&self) -> String { format!("BALANCE:{}", self.coin.ticker()) }
fn streamer_id(&self) -> StreamerId {
StreamerId::Balance {
coin: self.coin.ticker().to_string(),
}
}

async fn handle(
self,
Expand Down
6 changes: 3 additions & 3 deletions mm2src/coins/utxo/tx_history_events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::TransactionDetails;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};

use async_trait::async_trait;
use futures::channel::oneshot;
Expand All @@ -14,14 +14,14 @@ impl TxHistoryEventStreamer {
pub fn new(coin: String) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") }
pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } }
}

#[async_trait]
impl EventStreamer for TxHistoryEventStreamer {
type DataInType = Vec<TransactionDetails>;

fn streamer_id(&self) -> String { Self::derive_streamer_id(&self.coin) }
fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(&self.coin) }

async fn handle(
self,
Expand Down
10 changes: 7 additions & 3 deletions mm2src/coins/utxo/utxo_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use common::log;
use futures::channel::oneshot;
use futures::StreamExt;
use keys::Address;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use std::collections::{HashMap, HashSet};

macro_rules! try_or_continue {
Expand Down Expand Up @@ -40,14 +40,18 @@ impl UtxoBalanceEventStreamer {
}
}

pub fn derive_streamer_id(coin: &str) -> String { format!("BALANCE:{coin}") }
pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } }
}

#[async_trait]
impl EventStreamer for UtxoBalanceEventStreamer {
type DataInType = ScripthashNotification;

fn streamer_id(&self) -> String { format!("BALANCE:{}", self.coin.ticker()) }
fn streamer_id(&self) -> StreamerId {
StreamerId::Balance {
coin: self.coin.ticker().to_string(),
}
}

async fn handle(
self,
Expand Down
6 changes: 3 additions & 3 deletions mm2src/coins/z_coin/tx_history_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::utxo::rpc_clients::UtxoRpcError;
use crate::MarketCoinOps;
use common::log;
use mm2_err_handle::prelude::MmError;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};
use rpc::v1::types::H256 as H256Json;

use async_trait::async_trait;
Expand All @@ -23,14 +23,14 @@ impl ZCoinTxHistoryEventStreamer {
pub fn new(coin: ZCoin) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> String { format!("TX_HISTORY:{coin}") }
pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::TxHistory { coin: coin.to_string() } }
}

#[async_trait]
impl EventStreamer for ZCoinTxHistoryEventStreamer {
type DataInType = Vec<WalletTx<Nullifier>>;

fn streamer_id(&self) -> String { Self::derive_streamer_id(self.coin.ticker()) }
fn streamer_id(&self) -> StreamerId { Self::derive_streamer_id(self.coin.ticker()) }

async fn handle(
self,
Expand Down
8 changes: 4 additions & 4 deletions mm2src/coins/z_coin/tx_streaming_tests/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ fn test_zcoin_tx_streaming() {
.expect("tx history sender shutdown");

log!("{:?}", event.get());
let (event_type, event_data) = event.get();
let (streamer_id, event_data) = event.get();
// Make sure this is not an error event,
assert!(!event_type.starts_with("ERROR_"));
assert!(!streamer_id.starts_with("ERROR:"));
Comment thread
BigFish2086 marked this conversation as resolved.
// from the expected streamer,
assert_eq!(
event_type,
ZCoinTxHistoryEventStreamer::derive_streamer_id(coin.ticker())
streamer_id,
ZCoinTxHistoryEventStreamer::derive_streamer_id(coin.ticker()).to_string()
);
// and has the expected data.
assert_eq!(event_data["tx_hash"].as_str().unwrap(), tx.txid().to_string());
Expand Down
10 changes: 7 additions & 3 deletions mm2src/coins/z_coin/z_balance_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use common::log::error;
use futures::channel::oneshot;
use futures_util::StreamExt;
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput};
use mm2_event_stream::{Broadcaster, Event, EventStreamer, StreamHandlerInput, StreamerId};

pub struct ZCoinBalanceEventStreamer {
coin: ZCoin,
Expand All @@ -17,14 +17,18 @@ impl ZCoinBalanceEventStreamer {
pub fn new(coin: ZCoin) -> Self { Self { coin } }

#[inline(always)]
pub fn derive_streamer_id(coin: &str) -> String { format!("BALANCE:{coin}") }
pub fn derive_streamer_id(coin: &str) -> StreamerId { StreamerId::Balance { coin: coin.to_string() } }
}

#[async_trait]
impl EventStreamer for ZCoinBalanceEventStreamer {
type DataInType = ();

fn streamer_id(&self) -> String { Self::derive_streamer_id(self.coin.ticker()) }
fn streamer_id(&self) -> StreamerId {
StreamerId::Balance {
coin: self.coin.ticker().to_string(),
}
}

async fn handle(
self,
Expand Down
12 changes: 7 additions & 5 deletions mm2src/mm2_core/src/data_asker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use derive_more::Display;
use futures::channel::oneshot;
use futures::lock::Mutex as AsyncMutex;
use mm2_err_handle::prelude::*;
use mm2_event_stream::Event;
use mm2_event_stream::{Event, StreamerId};
use ser_error_derive::SerializeErrorType;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand All @@ -16,8 +16,6 @@ use timed_map::{MapKind, TimedMap};

use crate::mm_ctx::{MmArc, MmCtx};

const EVENT_NAME: &str = "DATA_NEEDED";

#[derive(Clone, Debug)]
pub struct DataAsker {
data_id: Arc<AtomicUsize>,
Expand Down Expand Up @@ -81,8 +79,12 @@ impl MmCtx {
"data": data
});

self.event_stream_manager
.broadcast_all(Event::new(format!("{EVENT_NAME}:{data_type}"), input));
self.event_stream_manager.broadcast_all(Event::new(
StreamerId::DataNeeded {
data_type: data_type.to_string(),
},
input,
));

match receiver.timeout(timeout).await {
Ok(Ok(response)) => match serde_json::from_value::<Output>(response) {
Expand Down
22 changes: 13 additions & 9 deletions mm2src/mm2_event_stream/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::StreamerId;
use serde_json::Value as Json;

// Note `Event` shouldn't be `Clone`able, but rather Arc/Rc wrapped and then shared.
// This is only for testing.
/// Multi-purpose/generic event type that can easily be used over the event streaming
#[cfg_attr(any(test, target_arch = "wasm32"), derive(Clone, Debug, PartialEq))]
#[derive(Default)]
pub struct Event {
/// The type of the event (balance, network, swap, etc...).
event_type: String,
streamer_id: StreamerId,
/// The message to be sent to the client.
message: Json,
/// Indicating whether this event is an error event or a normal one.
Expand All @@ -17,31 +17,35 @@ pub struct Event {
impl Event {
/// Creates a new `Event` instance with the specified event type and message.
#[inline(always)]
pub fn new(streamer_id: String, message: Json) -> Self {
pub fn new(streamer_id: StreamerId, message: Json) -> Self {
Self {
event_type: streamer_id,
streamer_id,
message,
error: false,
}
}

/// Create a new error `Event` instance with the specified error event type and message.
#[inline(always)]
pub fn err(streamer_id: String, message: Json) -> Self {
pub fn err(streamer_id: StreamerId, message: Json) -> Self {
Self {
event_type: streamer_id,
streamer_id,
message,
error: true,
}
}

/// Returns the `event_type` (the ID of the streamer firing this event).
/// Returns whether this event is an error or not
#[inline(always)]
pub fn origin(&self) -> &str { &self.event_type }
pub fn is_error(&self) -> bool { self.error }

/// Returns the `streamer_id` (the ID of the streamer firing this event).
#[inline(always)]
pub fn origin(&self) -> &StreamerId { &self.streamer_id }

/// Returns the event type and message as a pair.
pub fn get(&self) -> (String, &Json) {
let prefix = if self.error { "ERROR:" } else { "" };
(format!("{prefix}{}", self.event_type), &self.message)
(format!("{prefix}{}", self.streamer_id), &self.message)
}
}
2 changes: 2 additions & 0 deletions mm2src/mm2_event_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ pub mod configuration;
pub mod event;
pub mod manager;
pub mod streamer;
pub mod streamer_ids;

// Re-export important types.
pub use configuration::EventStreamingConfiguration;
pub use event::Event;
pub use manager::{StreamingManager, StreamingManagerError};
pub use streamer::{Broadcaster, EventStreamer, NoDataIn, StreamHandlerInput};
pub use streamer_ids::StreamerId;
Loading
Loading