Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions mm2src/coins/lp_price.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use common::log::{debug, error};
use common::log::{debug, error, info};
use common::StatusCode;
use mm2_err_handle::prelude::{MmError, OrMmError};
use mm2_net::transport::SlurpError;
Expand All @@ -10,7 +10,7 @@ use std::collections::HashMap;
#[cfg(feature = "run-docker-tests")] use std::str::FromStr;
use std::str::Utf8Error;

const PRICE_ENDPOINTS: [&str; 3] = [
pub const PRICE_ENDPOINTS: [&str; 3] = [
"https://prices.komodian.info/api/v2/tickers",
"https://prices.cipig.net:1717/api/v2/tickers",
"https://defi-stats.komodo.earth/api/v3/prices/tickers_v2",
Expand Down Expand Up @@ -209,10 +209,26 @@ async fn process_price_request(price_url: &str) -> Result<TickerInfosRegistry, M
Ok(TickerInfosRegistry(model))
}

pub async fn fetch_price_tickers(price_url: &str) -> Result<TickerInfosRegistry, MmError<PriceServiceRequestError>> {
let model = process_price_request(price_url).await?;
debug!("price registry size: {}", model.0.len());
Ok(model)
pub async fn fetch_price_tickers(
price_urls: &mut [String],
) -> Result<TickerInfosRegistry, MmError<PriceServiceRequestError>> {
for (i, url) in price_urls.to_owned().iter().enumerate() {
let model = match process_price_request(url).await {
Ok(model) => model,
Err(err) => {
error!("Error fetching price from: {}, error: {:?}", url, err);
continue;
},
};
price_urls.rotate_left(i);
debug!("price registry size: {}", model.0.len());
info!("price successfully fetched from {url}");
return Ok(model);
}

MmError::err(PriceServiceRequestError::HttpProcessError(
"couldn't fetch price".to_string(),
))
}

/// CEXRates, structure for storing `base` coin and `rel` coin USD price
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ cfg_wasm32! {
#[path = "lp_ordermatch/best_orders.rs"] mod best_orders;
#[path = "lp_ordermatch/lp_bot.rs"] mod lp_bot;
pub use lp_bot::{start_simple_market_maker_bot, stop_simple_market_maker_bot, StartSimpleMakerBotRequest,
TradingBotEvent, KMD_PRICE_ENDPOINT};
TradingBotEvent};

#[path = "lp_ordermatch/my_orders_storage.rs"]
mod my_orders_storage;
Expand Down
4 changes: 2 additions & 2 deletions mm2src/mm2_main/src/lp_ordermatch/lp_bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::mm2::lp_ordermatch::lp_bot::simple_market_maker_bot::{tear_down_bot,
PRECISION_FOR_NOTIFICATION};
use crate::mm2::lp_swap::MakerSwapStatusChanged;
pub use simple_market_maker_bot::{start_simple_market_maker_bot, stop_simple_market_maker_bot,
StartSimpleMakerBotRequest, KMD_PRICE_ENDPOINT};
StartSimpleMakerBotRequest};

#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "simple_market_maker_tests.rs"]
Expand Down Expand Up @@ -93,7 +93,7 @@ impl From<TradingBotStarted> for TradingBotEvent {
pub struct RunningState {
trading_bot_cfg: SimpleMakerBotRegistry,
bot_refresh_rate: f64,
price_url: String,
price_urls: Vec<String>,
}

pub struct StoppingState {
Expand Down
136 changes: 74 additions & 62 deletions mm2src/mm2_main/src/lp_ordermatch/simple_market_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::mm2::{lp_ordermatch::{cancel_order, create_maker_order,
update_maker_order, CancelOrderReq, MakerOrder, MakerOrderUpdateReq,
OrdermatchContext, SetPriceReq},
lp_swap::{latest_swaps_for_pair, LatestSwapsErr}};
use coins::lp_price::{fetch_price_tickers, Provider, RateInfos};
use coins::lp_price::{fetch_price_tickers, Provider, RateInfos, PRICE_ENDPOINTS};
use coins::{lp_coinfind, GetNonZeroBalance};
use common::{executor::{SpawnFuture, Timer},
log::{debug, error, info, warn},
Expand All @@ -23,7 +23,6 @@ use std::collections::{HashMap, HashSet};
use uuid::Uuid;

// !< constants
pub const KMD_PRICE_ENDPOINT: &str = "https://prices.komodian.info/api/v2/tickers";
pub const BOT_DEFAULT_REFRESH_RATE: f64 = 30.0;
pub const PRECISION_FOR_NOTIFICATION: u64 = 8;
const LATEST_SWAPS_LIMIT: usize = 1000;
Expand Down Expand Up @@ -110,10 +109,36 @@ impl From<std::string::String> for OrderProcessingError {
fn from(error: std::string::String) -> Self { OrderProcessingError::LegacyError(error) }
}

#[derive(Deserialize)]
enum PriceSources {
#[serde(rename = "price_url")]
Singular(String),
#[serde(rename = "price_urls")]
Multiple(Vec<String>),
}

impl Default for PriceSources {
fn default() -> Self { PriceSources::Multiple(PRICE_ENDPOINTS.iter().map(ToString::to_string).collect()) }
}

impl PriceSources {
/// # Important
///
/// Always use this to get the data
fn get_urls(&self) -> Vec<String> {
match self {
// TODO: deprecate price_url soon and inform the users
PriceSources::Singular(url) => vec![url.clone()],
PriceSources::Multiple(urls) => urls.clone(),
}
}
}

#[derive(Deserialize)]
pub struct StartSimpleMakerBotRequest {
cfg: SimpleMakerBotRegistry,
price_url: Option<String>,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please leave the singular price_url for backwards compatibility in the short term? e.g. translate it into a list with single item?
During netid migration it would be optimal to avoid any maker config issues until transition is complete.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error is thrown if using the old param, looks like it uses the defaults. I'll can mark it as deprecated in docs, could we also add a log message in the bot update loop when it is used to warn users of the deprecation?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we also add a log message in the bot update loop when it is used to warn users of the deprecation?

Mmm, I guess to do that I have to keep price_url in addition to price_urls to check if it's used or not. If used, what would you like the behaviour to be of the 2 below options:
1 - Translate it into a list with single item and log a warning message.
2 - Use default list and log warning message.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A third option is to revert price_urls to price_url and make it accept either a list or a string, but it won't be very good in my opinion since we need to inform users about the change.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A third option is to revert price_urls to price_url and make it accept either a list or a string, but it won't be very good in my opinion since we need to inform users about the change.

I would support both at the same time with an enum.

e.g.,:

enum PriceSourceType {
    #[serde(rename = "price_url")]
    Singular(String),
    #[serde(rename = "price_urls")]
    Multiple(Vec<String>),
}

impl PriceSourceType {
    /// Always use this to get the data
    fn get_urls(&self) -> Vec<String> {
        match self {
            // TODO: deprecate price_url soon and inform the users
            PriceSourceType::Singular(t) => vec![t.clone()],
            PriceSourceType::Multiple(t) => t.clone(),
        }
    }
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

#[serde(default, flatten)]
price_sources: PriceSources,
bot_refresh_rate: Option<f64>,
}

Expand Down Expand Up @@ -614,81 +639,68 @@ async fn execute_create_single_order(

async fn process_bot_logic(ctx: &MmArc) {
let simple_market_maker_bot_ctx = TradingBotContext::from_ctx(ctx).unwrap();
let state = simple_market_maker_bot_ctx.trading_bot_states.lock().await;
let (cfg, price_url) = if let TradingBotState::Running(running_state) = &*state {
let res = (running_state.trading_bot_cfg.clone(), running_state.price_url.clone());
drop(state);
res
} else {
drop(state);
return;
let mut state = simple_market_maker_bot_ctx.trading_bot_states.lock().await;
let running_state = match &mut *state {
TradingBotState::Running(running_state) => running_state,
TradingBotState::Stopping(_) | TradingBotState::Stopped(_) => return,
};
let rates_registry = match fetch_price_tickers(price_url.as_str()).await {
Ok(model) => {
info!("price successfully fetched from {price_url}");
model
},

let cfg = running_state.trading_bot_cfg.clone();
let rates_registry = match fetch_price_tickers(&mut running_state.price_urls).await {
Ok(model) => model,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 let cfg = running_state.trading_bot_cfg.clone();
    let rates_registry = match fetch_price_tickers(&mut running_state.price_urls).await {
        Ok(model) => model,
        Err(err) => {
            let nb_orders = cancel_pending_orders(ctx, &cfg).await;
            error!("error fetching price: {err:?} - cancel {nb_orders} orders");
            return;
        },
    };

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Err(err) => {
let nb_orders = cancel_pending_orders(ctx, &cfg).await;
error!("error fetching price: {err:?} - cancel {nb_orders} orders");
return;
},
};

drop(state);

let mut memoization_pair_registry: HashSet<String> = HashSet::new();
let ordermatch_ctx = OrdermatchContext::from_ctx(ctx).unwrap();
let maker_orders = ordermatch_ctx.maker_orders_ctx.lock().orders.clone();
let mut futures_order_update = Vec::with_capacity(0);
// Iterating over maker orders and update order that are present in cfg as the key_trade_pair e.g KMD/LTC
for (uuid, order_mutex) in maker_orders.into_iter() {
let mut futures_order_update = Vec::with_capacity(maker_orders.len());
for (uuid, order_mutex) in maker_orders {
let order = order_mutex.lock().await;
let key_trade_pair = TradingPair::new(order.base.clone(), order.rel.clone());
match cfg.get(&key_trade_pair.as_combination()) {
Some(coin_cfg) => {
if !coin_cfg.enable {
continue;
}
let cloned_infos = (
ctx.clone(),
rates_registry
.get_cex_rates(&coin_cfg.base, &coin_cfg.rel)
.unwrap_or_default(),
key_trade_pair.clone(),
coin_cfg.clone(),
);
futures_order_update.push(execute_update_order(uuid, order.clone(), cloned_infos));
memoization_pair_registry.insert(key_trade_pair.as_combination());
},
_ => continue,

if let Some(coin_cfg) = cfg.get(&key_trade_pair.as_combination()) {
if !coin_cfg.enable {
continue;
}
let cloned_infos = (
ctx.clone(),
rates_registry
.get_cex_rates(&coin_cfg.base, &coin_cfg.rel)
.unwrap_or_default(),
key_trade_pair.clone(),
coin_cfg.clone(),
);
futures_order_update.push(execute_update_order(uuid, order.clone(), cloned_infos));
memoization_pair_registry.insert(key_trade_pair.as_combination());
}
}

let all_updated_orders_tasks = futures::future::join_all(futures_order_update);
let _results_order_updates = all_updated_orders_tasks.await;
let _results_order_updates = futures::future::join_all(futures_order_update).await;

let mut futures_order_creation = Vec::with_capacity(0);
let mut futures_order_creation = Vec::with_capacity(cfg.len());
// Now iterate over the registry and for every pairs that are not hit let's create an order
for (trading_pair, cur_cfg) in cfg.into_iter() {
match memoization_pair_registry.get(&trading_pair) {
Some(_) => continue,
None => {
if !cur_cfg.enable {
continue;
}
let rates_infos = rates_registry
.get_cex_rates(&cur_cfg.base, &cur_cfg.rel)
.unwrap_or_default();
futures_order_creation.push(execute_create_single_order(
rates_infos,
cur_cfg,
trading_pair.clone(),
ctx,
));
},
};
for (trading_pair, cur_cfg) in cfg {
if memoization_pair_registry.get(&trading_pair).is_some() || !cur_cfg.enable {
continue;
}
let rates_infos = rates_registry
.get_cex_rates(&cur_cfg.base, &cur_cfg.rel)
.unwrap_or_default();
futures_order_creation.push(execute_create_single_order(
rates_infos,
cur_cfg,
trading_pair.clone(),
ctx,
));
}
let all_created_orders_tasks = futures::future::join_all(futures_order_creation);
let _results_order_creations = all_created_orders_tasks.await;
let _results_order_creations = futures::future::join_all(futures_order_creation).await;
}

pub async fn lp_bot_loop(ctx: MmArc) {
Expand Down Expand Up @@ -738,7 +750,7 @@ pub async fn start_simple_market_maker_bot(ctx: MmArc, req: StartSimpleMakerBotR
*state = RunningState {
trading_bot_cfg: req.cfg,
bot_refresh_rate: refresh_rate,
price_url: req.price_url.unwrap_or_else(|| KMD_PRICE_ENDPOINT.to_string()),
price_urls: req.price_sources.get_urls(),
}
.into();
drop(state);
Expand Down Expand Up @@ -793,15 +805,15 @@ mod tests {
let another_cloned_ctx = ctx.clone();
let req = StartSimpleMakerBotRequest {
cfg: Default::default(),
price_url: None,
price_sources: Default::default(),
bot_refresh_rate: None,
};
let answer = block_on(start_simple_market_maker_bot(ctx, req)).unwrap();
assert_eq!(answer.get_result(), "Success");

let req = StartSimpleMakerBotRequest {
cfg: Default::default(),
price_url: None,
price_sources: Default::default(),
bot_refresh_rate: None,
};
let answer = block_on(start_simple_market_maker_bot(cloned_ctx, req));
Expand Down