diff --git a/Cargo.lock b/Cargo.lock index 3a7a0574717..320e5f8cba0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5120,10 +5120,12 @@ dependencies = [ "custom_derive", "enum_derive", "env_logger 0.9.0", + "fnv", "futures 0.3.15", "jsonrpc-client-transports 18.0.0", "jsonrpc-core 18.0.0", "jsonrpc-core-client 18.0.0", + "lazy_static", "log 0.4.14", "serde", "serde_json", diff --git a/pallets/oracle/src/lib.rs b/pallets/oracle/src/lib.rs index d99734711b9..0eae804b150 100644 --- a/pallets/oracle/src/lib.rs +++ b/pallets/oracle/src/lib.rs @@ -240,8 +240,7 @@ pub mod pallet { #[pallet::hooks] impl Hooks> for Pallet { fn on_initialize(block: T::BlockNumber) -> Weight { - for i in 0..AssetsCount::::get() { - let asset_info = AssetsInfo::::get(i); + for (i, asset_info) in AssetsInfo::::iter() { // TODO maybe add a check if price is requested, is less operations? let pre_pruned_prices = PrePrices::::get(i); let mut pre_prices = Vec::new(); @@ -518,7 +517,7 @@ pub mod pallet { } pub fn check_requests() { - for i in 0..AssetsCount::::get() { + for (i, _) in AssetsInfo::::iter() { if Requested::::get(i) { let _ = Self::fetch_price_and_send_signed(&i); } diff --git a/price-feed/Cargo.toml b/price-feed/Cargo.toml index 8c82c380d56..63863f27820 100644 --- a/price-feed/Cargo.toml +++ b/price-feed/Cargo.toml @@ -21,3 +21,4 @@ warp = "0.3" signal-hook = "0.3" signal-hook-tokio = { version = "0.3", features = [ "futures-v0_3" ] } futures = "0.3" +lazy_static = "1.4.0" diff --git a/price-feed/README.md b/price-feed/README.md new file mode 100644 index 00000000000..bcc966ca36f --- /dev/null +++ b/price-feed/README.md @@ -0,0 +1,19 @@ +# Draft + +Currently, in the pallet, the price of an asset is expressed in USD cents, it's not a ratio between two assets like in exchanges. +The price server handle arbitrary asset pair instead of fixing the denominator to USD, e.g. ETH/BTC, ADA/BTC... + +We currently use an arbitrary `asset_id` in the oracle pallet. +The server has a hardcoded map (u8 => Asset) to represent this ID. + +# Getting started + +1. Run an instance of the composable node. +2. Run `pythd` along with `pyth_tx` using the provided nix script: + - Open a new terminal and run `nix-shell run_pyth.nix`. + - A bash function `run` is now available to start `pythd/pyth_tx` + - Whenever you exit the terminal after having ran the `run` function, the two instances are going to be shutdown. + - You have accesss to both `pythd/pyth_tx` logs by using $PYTHD_LOG and $PYTH_TX_LOG +3. Run the price server, assuming you are running `RUST_LOG=info cargo run --bin price-feed` ![img not found](images/normal_run.png). +4. Go on your local [substrate panel](https://polkadot.js.org/apps) and add a new asset, make sure to use one of the `FNV1A64` hash as `asset_id`. +5. Trigger a price request for each `asset_id` you created and watch the oracle state machine progress. diff --git a/price-feed/images/normal_run.png b/price-feed/images/normal_run.png new file mode 100644 index 00000000000..b849bddf0c5 Binary files /dev/null and b/price-feed/images/normal_run.png differ diff --git a/price-feed/run_pyth.nix b/price-feed/run_pyth.nix index b97751620d0..bc71a933e90 100644 --- a/price-feed/run_pyth.nix +++ b/price-feed/run_pyth.nix @@ -20,28 +20,48 @@ let ''; installPhase = '' mkdir -p $out/bin + mv pyth $out/bin mv pythd $out/bin mv pyth_tx $out/bin + mv ../pctest/init_key_store.sh $out/bin ''; }; in mkShell { - packages = [ pythd ]; + buildInputs = [ pythd ]; SOLANA_ENV = "devnet"; shellHook = '' - echo "Running up pyth_tx & puthd" - export PYTH_TX_LOG=$(mktemp) - pyth_tx -l $PYTH_TX_LOG -d -r api.$SOLANA_ENV.solana.com & - export PYTH_TX_PID=$! - export PYTHD_LOG=$(mktemp) - pythd -l $PYTHD_LOG -d -r api.$SOLANA_ENV.solana.com & - export PYTHD_PID=$! - teardown() { - echo "Shuting down pyth_tx & pythd"; - kill -2 $PYTHD_PID - kill -2 $PYTH_TX_PID - rm $PYTH_TX_LOG - rm $PYTHD_LOG + export PYTHD_KEYSTORE=$HOME/.pythd + + function init_keystore() { + echo "Creating key store" + rm -rf $PYTHD_KEYSTORE || true + mkdir -m 600 -p $PYTHD_KEYSTORE + echo "Initializing key store" + ${pythd}/bin/pyth init_key -k $PYTHD_KEYSTORE + echo "Populating key store" + ${pythd}/bin/init_key_store.sh $SOLANA_ENV $PYTHD_KEYSTORE + } + + function run() { + echo "Running pyth_tx" + export PYTH_TX_LOG=$(mktemp) + ${pythd}/bin/pyth_tx -l $PYTH_TX_LOG -d -r api.$SOLANA_ENV.solana.com & + export PYTH_TX_PID=$! + + echo "Running pythd" + export PYTHD_LOG=$(mktemp) + ${pythd}/bin/pythd -k $PYTHD_KEYSTORE -l $PYTHD_LOG -d -r api.$SOLANA_ENV.solana.com & + export PYTHD_PID=$! + + function teardown() { + echo "Shuting down pyth_tx & pythd"; + kill -2 $PYTHD_PID + kill -2 $PYTH_TX_PID + rm $PYTH_TX_LOG + rm $PYTHD_LOG + } + + trap teardown EXIT } - trap teardown EXIT ''; } diff --git a/price-feed/src/asset.rs b/price-feed/src/asset.rs index b35b554b670..e5c220162dc 100644 --- a/price-feed/src/asset.rs +++ b/price-feed/src/asset.rs @@ -1,15 +1,134 @@ +use serde::Serialize; +use std::{ + collections::{HashMap, HashSet}, + convert::TryFrom, + num::ParseIntError, + str::FromStr, +}; + custom_derive! { - #[derive(EnumFromStr, Copy, Clone, PartialEq, Eq, Hash, Debug)] - pub enum Asset { - BTC, - ETH, - LTC, - USD, - } + #[derive(EnumFromStr, Copy, Clone, PartialEq, Eq, Hash, Debug)] + pub enum Asset { + BTC, + ETH, + LTC, + DOGE, + SOL, + LUNA, + AAPL, + BNB, + TSLA, + BCH, + SRM, + AMZN, + GOOG, + NFLX, + XAU, + AMC, + SPY, + GME, + GE, + QQQ, + USDT, + USDC, + GBP, + EUR, + USD, + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct AssetPair(pub Asset, pub Asset); + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize)] +#[repr(transparent)] +pub struct AssetIndex(u8); + +pub enum AssetIndexError { + NotANumber(ParseIntError), + AssetNotFound, +} + +lazy_static! { + /* + The map of valid asset we are allowed to ask price for. + We must not swap two indexes. + */ + pub static ref INDEX_TO_ASSET: HashMap = [ + (0, Asset::BTC), + (1, Asset::ETH), + (2, Asset::LTC), + (3, Asset::DOGE), + (4, Asset::SOL), + (5, Asset::LUNA), + (6, Asset::AAPL), + (7, Asset::BNB), + (6, Asset::TSLA), + (7, Asset::BCH), + (8, Asset::SRM), + (9, Asset::AMZN), + (10, Asset::GOOG), + (11, Asset::NFLX), + (12, Asset::XAU), + (13, Asset::AMC), + (14, Asset::SPY), + (15, Asset::GME), + (16, Asset::GE), + (17, Asset::QQQ), + (18, Asset::USDT), + (19, Asset::USDC), + (20, Asset::GBP), + (21, Asset::EUR), + ] + .iter() + .map(|&(i, a)| (AssetIndex(i), a)) + .collect(); + pub static ref ASSET_TO_INDEX: HashMap = + INDEX_TO_ASSET.iter().map(|(&i, &a)| (a, i)).collect(); + pub static ref VALID_ASSETS: HashSet = + INDEX_TO_ASSET.values().copied().collect(); } -pub type AssetPair = (Asset, Asset); +impl FromStr for AssetIndex { + type Err = AssetIndexError; + fn from_str(s: &str) -> Result { + let asset_pair_index = + AssetIndex(FromStr::from_str(s).map_err(AssetIndexError::NotANumber)?); + if INDEX_TO_ASSET.contains_key(&asset_pair_index) { + Ok(asset_pair_index) + } else { + Err(AssetIndexError::AssetNotFound) + } + } +} + +impl AssetPair { + /* + We currently only allow X/USD + */ + pub fn new(x: Asset, y: Asset) -> Option { + match (x, y) { + (Asset::USD, _) => None, + (_, Asset::USD) => Some(AssetPair(x, y)), + _ => None, + } + } + + pub fn symbol(&self) -> String { + format!("{:?}/{:?}", self.0, self.1) + } +} + +impl TryFrom for AssetIndex { + type Error = (); + fn try_from(asset: Asset) -> Result { + ASSET_TO_INDEX.get(&asset).copied().ok_or(()) + } +} -pub fn symbol((x, y): AssetPair) -> String { - format!("{:?}/{:?}", x, y) +impl TryFrom for Asset { + type Error = (); + fn try_from(asset_index: AssetIndex) -> Result { + INDEX_TO_ASSET.get(&asset_index).copied().ok_or(()) + } } diff --git a/price-feed/src/backend.rs b/price-feed/src/backend.rs new file mode 100644 index 00000000000..c00fe5d6cdf --- /dev/null +++ b/price-feed/src/backend.rs @@ -0,0 +1,244 @@ +use crate::{cache::Cache, feed::FeedNotification}; +use futures::stream::{Fuse, StreamExt}; +use signal_hook_tokio::SignalsInfo; +use std::{convert::TryFrom, fmt::Debug}; +use tokio::{sync::mpsc, task::JoinHandle}; + +#[derive(PartialEq, Eq, Copy, Clone, Debug)] +pub enum FeedNotificationAction { + UpdateCache(K, V), +} + +pub trait Transition { + fn apply(&self, state: &mut S); +} + +impl Transition for FeedNotificationAction +where + TCache: Cache, + TAsset: Copy, + TPrice: Copy, +{ + fn apply(&self, state: &mut TCache) { + match self { + &FeedNotificationAction::UpdateCache(a, p) => state.insert(a, p), + } + } +} + +impl TryFrom> + for FeedNotificationAction +where + TAsset: Debug + Copy, + TPrice: Copy, +{ + type Error = (); + /* TODO: how are we going to handles X feeds: + - do we just expose every one of them from their own endpoint? + - do we merge the prices (median?), if so, merging will depend on timestamp? + On notification close, do we remove the price as we are no + longer getting new prices? + */ + fn try_from( + notification: FeedNotification, + ) -> Result, Self::Error> { + match notification { + FeedNotification::Opened(f, a) => { + log::info!("{:?} has opened a channel for {:?}", f, a); + Err(()) + } + FeedNotification::Closed(f, a) => { + log::info!("{:?} has closed a channel for {:?}", f, a); + Err(()) + } + FeedNotification::PriceUpdated(_, a, p) => { + Ok(FeedNotificationAction::UpdateCache(a, p)) + } + } + } +} + +pub struct Backend { + pub shutdown_handle: JoinHandle<()>, +} + +impl Backend { + pub async fn new( + mut prices_cache: TCache, + mut feed_channel: mpsc::Receiver, + mut shutdown_trigger: Fuse, + ) -> Backend + where + TCache: 'static + Cache + Send, + TNotification: 'static + Send + Debug, + TTransition: Transition + TryFrom, + { + let backend = async move { + 'l: loop { + tokio::select! { + _ = shutdown_trigger.next() => { + log::info!("terminating signal received."); + break 'l; + } + message = feed_channel.recv() => { + match message { + Some(notification) => { + log::debug!("notification received: {:?}", notification); + let _ = TTransition::try_from(notification) + .map(|action| { + action.apply(&mut prices_cache); + }); + } + None => { + log::info!("no more feed available... exiting handler."); + break 'l; + } + } + } + } + } + }; + + let shutdown_handle = tokio::spawn(backend); + + Backend { shutdown_handle } + } +} + +#[cfg(test)] +mod tests { + use super::Backend; + use crate::{ + asset::{Asset, VALID_ASSETS}, + backend::{FeedNotificationAction, Transition}, + cache::{PriceCache, ThreadSafePriceCache}, + feed::{Exponent, Feed, FeedNotification, Price, TimeStamp, TimeStamped, TimeStampedPrice}, + }; + use futures::stream::StreamExt; + use signal_hook_tokio::Signals; + use std::{ + collections::HashMap, + convert::TryFrom, + sync::{Arc, RwLock}, + }; + use tokio::sync::mpsc; + + #[test] + fn test_feed_notification_transition() { + let feed = Feed::Pyth; + let timestamped_price = TimeStamped { + value: (Price(0xCAFEBABE), Exponent(0x1337)), + timestamp: TimeStamp::now(), + }; + VALID_ASSETS.iter().for_each(|&asset| { + [ + (FeedNotification::Opened(feed, asset), None), + (FeedNotification::Closed(feed, asset), None), + ( + FeedNotification::PriceUpdated(feed, asset, timestamped_price), + Some(( + FeedNotificationAction::UpdateCache(asset, timestamped_price), + [(asset, timestamped_price)].iter().copied().collect(), + )), + ), + ] + .iter() + .for_each(|(notification, expected)| { + match ( + FeedNotificationAction::::try_from(*notification), + expected, + ) { + (Ok(actual_action), Some((expected_action, expected_state))) => { + assert_eq!(&actual_action, expected_action); + let mut state = HashMap::new(); + actual_action.apply(&mut state); + assert_eq!(&state, expected_state); + } + _ => { + // No action = no transition + } + } + }); + }); + } + + #[tokio::test] + async fn test_feed_backend() { + let mk_price = |x, y| TimeStamped { + value: (Price(x), Exponent(y)), + timestamp: TimeStamp::now(), + }; + let (price1, price2, price3) = (mk_price(123, -3), mk_price(3134, -1), mk_price(93424, -4)); + let feed = Feed::Pyth; + for &asset in VALID_ASSETS.iter() { + let tests = [ + ( + vec![ + FeedNotification::Opened(feed, asset), + FeedNotification::PriceUpdated(feed, asset, price1), + FeedNotification::Closed(feed, asset), + ], + [(asset, price1)], + ), + ( + vec![ + FeedNotification::Opened(feed, asset), + FeedNotification::PriceUpdated(feed, asset, price3), + FeedNotification::PriceUpdated(feed, asset, price1), + FeedNotification::PriceUpdated(feed, asset, price2), + FeedNotification::Closed(feed, asset), + ], + [(asset, price2)], + ), + ( + vec![ + FeedNotification::Opened(feed, asset), + FeedNotification::PriceUpdated(feed, asset, price2), + FeedNotification::PriceUpdated(feed, asset, price1), + FeedNotification::PriceUpdated(feed, asset, price3), + FeedNotification::Closed(feed, asset), + ], + [(asset, price3)], + ), + ]; + for (events, expected) in &tests { + let prices_cache: ThreadSafePriceCache = Arc::new(RwLock::new(HashMap::new())); + let (feed_in, feed_out) = + mpsc::channel::>(8); + let signals = Signals::new(&[]) + .expect("could not create signals stream") + .fuse(); + let backend = Backend::new::< + FeedNotification, + FeedNotificationAction, + _, + _, + _, + >(prices_cache.clone(), feed_out, signals) + .await; + + for &event in events { + feed_in + .send(event) + .await + .expect("could not send feed notification"); + } + + /* Drop the channel so that the backend exit and we join it. + This will ensure the events have been processed. + */ + drop(feed_in); + backend + .shutdown_handle + .await + .expect("could not join on backend handle"); + + let prices_cache_r = prices_cache.read().expect("could not acquire read lock"); + assert_eq!( + *prices_cache_r, + expected.iter().copied().collect::() + ); + } + } + } +} diff --git a/price-feed/src/cache.rs b/price-feed/src/cache.rs new file mode 100644 index 00000000000..217dba83663 --- /dev/null +++ b/price-feed/src/cache.rs @@ -0,0 +1,35 @@ +use crate::{asset::Asset, feed::TimeStampedPrice}; +use std::{ + collections::HashMap, + hash::Hash, + sync::{Arc, RwLock}, +}; + +pub type PriceCache = HashMap; + +pub type ThreadSafePriceCache = Arc>; + +pub trait Cache { + fn insert(&mut self, k: K, v: V); + fn get(&self, k: &K) -> Option; +} + +impl Cache for HashMap { + fn insert(&mut self, k: K, v: V) { + self.insert(k, v); + } + fn get(&self, k: &K) -> Option { + self.get(k).copied() + } +} + +impl, K: Eq + Hash, V: Copy> Cache for Arc> { + fn insert(&mut self, k: K, v: V) { + self.write() + .expect("could not acquire write lock") + .insert(k, v); + } + fn get(&self, k: &K) -> Option { + self.read().expect("could not acquire read lock").get(k) + } +} diff --git a/price-feed/src/feed/mod.rs b/price-feed/src/feed/mod.rs index 3af1739bb84..57e1d0a31ec 100644 --- a/price-feed/src/feed/mod.rs +++ b/price-feed/src/feed/mod.rs @@ -1,29 +1,44 @@ pub mod pyth; -use serde::Serialize; +use chrono::{Duration, Utc}; +use serde::{Deserialize, Serialize}; -use crate::asset::AssetPair; +#[derive(Serialize, PartialEq, Eq, Copy, Clone, Debug)] +#[repr(transparent)] +pub struct TimeStamp(pub i64); -#[derive(Serialize, Copy, Clone, Debug)] -pub struct TimeStamp(i64); +impl TimeStamp { + pub fn now() -> Self { + TimeStamp(Utc::now().timestamp()) + } + pub fn elapsed_since(&self, previous: &TimeStamp) -> Duration { + Duration::seconds(self.0 - previous.0) + } +} -#[derive(Serialize, Copy, Clone, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug)] #[repr(transparent)] -pub struct Price(u64); +pub struct Price(pub(crate) u64); -#[derive(Serialize, Copy, Clone, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Copy, Clone, Debug)] +#[repr(transparent)] +pub struct Exponent(pub(crate) i32); + +#[derive(Serialize, PartialEq, Eq, Copy, Clone, Debug)] pub struct TimeStamped { pub value: T, pub timestamp: TimeStamp, } -#[derive(PartialEq, Eq, Hash, Debug)] +pub type TimeStampedPrice = TimeStamped<(Price, Exponent)>; + +#[derive(PartialEq, Eq, Copy, Clone, Hash, Debug)] pub enum Feed { Pyth, } -#[derive(Debug)] -pub enum FeedNotification { - Opened(Feed, AssetPair), - Closed(Feed, AssetPair), - PriceUpdated(Feed, AssetPair, TimeStamped), +#[derive(PartialEq, Eq, Copy, Clone, Debug)] +pub enum FeedNotification { + Opened(Feed, A), + Closed(Feed, A), + PriceUpdated(Feed, A, P), } diff --git a/price-feed/src/feed/pyth.rs b/price-feed/src/feed/pyth.rs index b496a41ffd0..7b1fb7cecc9 100644 --- a/price-feed/src/feed/pyth.rs +++ b/price-feed/src/feed/pyth.rs @@ -1,27 +1,39 @@ -use super::{Feed, FeedNotification, Price, TimeStamped}; +use super::{Feed, FeedNotification, Price, TimeStamped, TimeStampedPrice}; use crate::{ - asset::{symbol, AssetPair}, - feed::TimeStamp, + asset::{Asset, AssetPair, VALID_ASSETS}, + feed::{Exponent, TimeStamp}, }; -use chrono::Utc; +use futures::stream::StreamExt; use jsonrpc_client_transports::{ transports::ws::connect, RpcError, TypedClient, TypedSubscriptionStream, }; -use jsonrpc_core_client::futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::{ sync::mpsc::{self, error::SendError}, task::JoinHandle, }; +use url::Url; -#[derive(Debug, Deserialize)] -struct PythNotification { - price: u64, +pub type PythFeedNotification = FeedNotification; + +#[derive(PartialEq, Eq, Copy, Clone, Debug, Deserialize)] +#[serde(rename_all = "lowercase")] +enum PythSymbolStatus { + Trading, + Halted, + Unknown, +} + +#[derive(Copy, Clone, Debug, Deserialize)] +struct PythNotifyPrice { + status: PythSymbolStatus, + price: Price, } #[derive(Clone, Debug, Deserialize)] struct PythProductPrice { account: String, + price_exponent: Exponent, } #[derive(Clone, Debug, Deserialize)] @@ -39,7 +51,7 @@ struct PythProduct { #[derive(Debug)] pub enum PythError { RpcError(RpcError), - ChannelError(SendError), + ChannelError(SendError), } #[derive(Serialize)] @@ -52,11 +64,38 @@ pub struct Pyth { handles: Vec>>, } +#[derive(PartialEq, Eq, Copy, Clone, Debug)] +enum PythNotifyPriceAction { + YieldFeedNotification(PythFeedNotification), +} + +fn notify_price_action( + asset: Asset, + product_price: &PythProductPrice, + notify_price: &PythNotifyPrice, + timestamp: &TimeStamp, +) -> Option { + match notify_price.status { + PythSymbolStatus::Trading => Some(PythNotifyPriceAction::YieldFeedNotification( + FeedNotification::PriceUpdated( + Feed::Pyth, + asset, + TimeStamped { + value: (notify_price.price, product_price.price_exponent), + timestamp: *timestamp, + }, + ), + )), + PythSymbolStatus::Halted => None, + PythSymbolStatus::Unknown => None, + } +} + impl Pyth { pub async fn new(url: &url::Url) -> Result { let client = connect::(url) .await - .map_err(|e| PythError::RpcError(e))?; + .map_err(PythError::RpcError)?; Ok(Pyth { client, handles: Vec::new(), @@ -64,75 +103,80 @@ impl Pyth { } async fn get_product_list(&self) -> Result, PythError> { - let result = self - .client + self.client .call_method::<(), Vec>("get_product_list", "", ()) .await - .map_err(|e| PythError::RpcError(e))?; - log::info!("Products: {:?}", result); - Ok(result) + .map_err(PythError::RpcError) } async fn subscribe( &mut self, - output: mpsc::Sender, + output: mpsc::Sender, asset_pair: AssetPair, - account: String, + product_price: PythProductPrice, ) -> Result<(), PythError> { log::info!( "Subscribing to asset pair {:?} from account {:?}", asset_pair, - account + product_price.account ); - let mut stream: TypedSubscriptionStream = self + let mut stream: TypedSubscriptionStream = self .client .subscribe( "subscribe_price", [PythSubscribeParams { - account: account.to_string(), + account: product_price.account.to_string(), }], "notify_price", "", "", ) - .map_err(|e| PythError::RpcError(e))?; + .map_err(PythError::RpcError)?; let join_handle = tokio::spawn(async move { output - .send(FeedNotification::Opened(Feed::Pyth, asset_pair)) + .send(FeedNotification::Opened(Feed::Pyth, asset_pair.0)) .await - .map_err(|e| PythError::ChannelError(e))?; + .map_err(PythError::ChannelError)?; 'a: loop { match stream.next().await { - Some(notification) => match notification { - Ok(price_notification) => { - log::info!( - "received price, {:?} = {:?}", - asset_pair, - price_notification - ); - output - .send(FeedNotification::PriceUpdated( - Feed::Pyth, - asset_pair, - TimeStamped { - value: Price(price_notification.price), - timestamp: TimeStamp(Utc::now().timestamp()), - }, - )) - .await - .map_err(|e| PythError::ChannelError(e))?; - } - _ => { - log::warn!("invalid notification?: {:?}", notification); + Some(Ok(notify_price)) => { + log::debug!( + "received notify_price, {:?}, {:?}", + asset_pair, + notify_price + ); + let timestamp = TimeStamp::now(); + match notify_price_action( + asset_pair.0, + &product_price, + ¬ify_price, + ×tamp, + ) { + Some(PythNotifyPriceAction::YieldFeedNotification( + feed_notification, + )) => { + output + .send(feed_notification) + .await + .map_err(PythError::ChannelError)?; + } + None => { + // TODO: should we close the feed if the received price don't yield a price update? + // e.g. the SymbolStatus != Trading + } } - }, + } + Some(Err(e)) => { + log::error!("unexpected rpc error: {:?}", e); + break 'a; + } None => break 'a, } } output - .send(FeedNotification::Closed(Feed::Pyth, asset_pair)) + .send(FeedNotification::Closed(Feed::Pyth, asset_pair.0)) .await - .map_err(|e| PythError::ChannelError(e))?; + .map_err(PythError::ChannelError)?; Ok(()) }); self.handles.push(join_handle); @@ -141,20 +185,21 @@ impl Pyth { pub async fn subscribe_to_asset( &mut self, - output: &mpsc::Sender, - asset_pair: AssetPair, + output: &mpsc::Sender, + asset_pair: &AssetPair, ) -> Result<(), PythError> { - let asset_pair_symbol = symbol(asset_pair); - let products = self.get_product_list().await?; - let price_accounts = products + let asset_pair_symbol = asset_pair.symbol(); + let product_prices = self + .get_product_list() + .await? .iter() .filter(|p| p.attr_dict.symbol == asset_pair_symbol) .flat_map(|p| p.price.clone()) - .map(|p| p.account.clone()) .collect::>(); - log::info!("Accounts for {:?}: {:?}", asset_pair_symbol, price_accounts); - for account in price_accounts { - self.subscribe(output.clone(), asset_pair, account).await? + log::info!("Accounts for {:?}: {:?}", asset_pair_symbol, product_prices); + for product_price in product_prices { + self.subscribe(output.clone(), *asset_pair, product_price) + .await? } Ok(()) } @@ -163,3 +208,73 @@ impl Pyth { self.handles.iter().for_each(drop); } } + +// TODO: manage multiple feeds +pub async fn run_full_subscriptions( + pythd_host: &String, +) -> (Pyth, mpsc::Receiver) { + /* Its important to drop the initial feed_in as it will be cloned for all subsequent tasks + The received won't get notified if all cloned senders are closed but not the 'main' one. + */ + let (feed_in, feed_out) = mpsc::channel::(128); + + let mut pyth = Pyth::new(&Url::parse(&pythd_host).expect("invalid pythd host address.")) + .await + .expect("connection to pythd failed"); + + // TODO: subscribe to all asset prices? cli configurable? + log::info!("successfully connected to pythd."); + for &asset in VALID_ASSETS.iter() { + if let Some(asset_pair) = AssetPair::new(asset, Asset::USD) { + pyth.subscribe_to_asset(&feed_in, &asset_pair) + .await + .expect("failed to subscribe to asset"); + } + } + + (pyth, feed_out) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{asset::*, feed::FeedNotification}; + + #[test] + fn test_notify_price_action() { + let account = "irrelevant".to_string(); + let product_price = PythProductPrice { + account, + price_exponent: Exponent(0x1337), + }; + let price = Price(0xCAFEBABE); + let timestamp = TimeStamp::now(); + VALID_ASSETS.iter().for_each(|&asset| { + [ + (PythSymbolStatus::Halted, None), + (PythSymbolStatus::Unknown, None), + ( + PythSymbolStatus::Trading, + Some(PythNotifyPriceAction::YieldFeedNotification( + FeedNotification::PriceUpdated( + Feed::Pyth, + asset, + TimeStamped { + value: (price, product_price.price_exponent), + timestamp, + }, + ), + )), + ), + ] + .iter() + .for_each(|&(status, expected_action)| { + let notify_price = PythNotifyPrice { status, price }; + assert_eq!( + expected_action, + notify_price_action(asset, &product_price, ¬ify_price, ×tamp) + ) + }); + }); + } +} diff --git a/price-feed/src/frontend.rs b/price-feed/src/frontend.rs new file mode 100644 index 00000000000..33ed4a4f17f --- /dev/null +++ b/price-feed/src/frontend.rs @@ -0,0 +1,219 @@ +use crate::{ + asset::{Asset, AssetIndex}, + cache::{PriceCache, ThreadSafePriceCache}, + feed::{Exponent, Price, TimeStamp, TimeStampedPrice}, +}; +use chrono::Duration; +use futures::channel::oneshot; +use serde::Serialize; +use std::{ + convert::TryFrom, + net::SocketAddr, + str::FromStr, + sync::{Arc, RwLock}, +}; +use tokio::task::JoinHandle; +use warp::{ + hyper::StatusCode, + reply::{self, Json, WithStatus}, + Filter, +}; + +#[derive(PartialEq, Eq, Serialize, Copy, Clone, Debug)] +#[serde(rename = "USD")] +#[repr(transparent)] +pub struct USDCentPrice(u64); + +pub struct Frontend { + pub shutdown_trigger: oneshot::Sender<()>, + pub shutdown_handle: JoinHandle<()>, +} + +impl Frontend { + pub async fn new(listening_address: &String, prices_cache: Arc>) -> Self { + let get_asset_id_endpoint = warp::path!("asset_id" / Asset) + .and(warp::get()) + .map(get_asset_id); + + let get_price_endpoint = warp::path!("price" / AssetIndex / u128) + .and(warp::get()) + .map(move |asset_index, _request_id| { + get_price(&prices_cache, asset_index, _request_id) + }); + + let (shutdown_trigger, shutdown) = oneshot::channel::<()>(); + let (_, server) = warp::serve(get_price_endpoint.or(get_asset_id_endpoint)) + .bind_with_graceful_shutdown( + SocketAddr::from_str(listening_address).expect("invalid listening address."), + async { + shutdown.await.ok(); + }, + ); + + let shutdown_handle = tokio::spawn(server); + + Frontend { + shutdown_trigger, + shutdown_handle, + } + } +} + +fn get_asset_id(x: Asset) -> WithStatus { + match AssetIndex::try_from(x) { + Ok(asset_index) => reply::with_status(reply::json(&asset_index), StatusCode::OK), + Err(_) => reply::with_status(reply::json(&()), StatusCode::NOT_FOUND), + } +} + +/* + The oracle pallet is expecting a price in USD cents. + While this server handle any asset pair. + It make this part of code very specific... + Shouldn't we use the unit of value + exponent for any asset pair? + + Also, the price might be outdated, we added the timestamp value to it. + Should the offchain worker handle this or should we put some kind of timeout + here and wipe the cached value? +*/ +fn get_price( + prices: &ThreadSafePriceCache, + asset_index: AssetIndex, + _request_id: u128, +) -> WithStatus { + // TODO: What is the request_id useful for (comming from oracle pallet)? + match Asset::try_from(asset_index).and_then(|asset| { + let now = TimeStamp::now(); + let max_cache_duration = Duration::seconds(10); + prices + .read() + .expect("could not acquire read lock") + .get(&asset) + .copied() + .and_then(|timestamped_price| { + ensure_uptodate_price(&max_cache_duration, &now, ×tamped_price) + }) + .map(get_usd_cent_price) + .ok_or(()) + }) { + Ok(usd_cent_price) => reply::with_status(reply::json(&usd_cent_price), StatusCode::OK), + Err(_) => reply::with_status(reply::json(&()), StatusCode::NOT_FOUND), + } +} + +/* + Ensure that the value was registered less than X seconds ago +*/ +fn ensure_uptodate_price( + &max_cache_duration: &Duration, + current_timestamp: &TimeStamp, + timestamped_price: &TimeStampedPrice, +) -> Option<(Price, Exponent)> { + if current_timestamp.elapsed_since(×tamped_price.timestamp) < max_cache_duration { + Some(timestamped_price.value) + } else { + None + } +} + +fn get_usd_cent_price((Price(p), Exponent(q)): (Price, Exponent)) -> USDCentPrice { + let usd_adjust_cent_exponent = q + 2; + let usd_cent_price = match usd_adjust_cent_exponent.signum() { + 0 => p, + 1 => p * u64::pow(10u64, usd_adjust_cent_exponent as u32), + -1 => p / u64::pow(10u64, usd_adjust_cent_exponent.abs() as u32), + _ => unreachable!(), + }; + USDCentPrice(usd_cent_price) +} + +#[cfg(test)] +mod tests { + use super::{get_usd_cent_price, USDCentPrice}; + use crate::{ + feed::{Exponent, Price, TimeStamp, TimeStamped}, + frontend::ensure_uptodate_price, + }; + use chrono::Duration; + + #[test] + fn test_ensure_uptodate_price() { + let value = (Price(0x1337), Exponent(10)); + [ + ( + ( + Duration::seconds(1), + TimeStamp(1), + TimeStamped { + value, + timestamp: TimeStamp(0), + }, + ), + None, + ), + ( + ( + Duration::seconds(5), + TimeStamp(6), + TimeStamped { + value, + timestamp: TimeStamp(0), + }, + ), + None, + ), + ( + ( + Duration::seconds(20), + TimeStamp(20), + TimeStamped { + value, + timestamp: TimeStamp(1), + }, + ), + Some(value), + ), + ( + ( + Duration::seconds(10), + TimeStamp(14), + TimeStamped { + value, + timestamp: TimeStamp(5), + }, + ), + Some(value), + ), + ] + .iter() + .for_each( + |((max_cache_duration, current_timestamp, timestamped_price), expected)| { + assert_eq!( + ensure_uptodate_price(max_cache_duration, current_timestamp, timestamped_price), + *expected + ); + }, + ) + } + + #[test] + fn test_get_usd_cent_price() { + [ + ((Price(0xCAFEBABE), Exponent(-2)), USDCentPrice(0xCAFEBABE)), + ( + (Price(0xDEADBEEF), Exponent(2)), + USDCentPrice(0xDEADBEEF * u64::pow(10, 2 + 2)), + ), + ((Price(1), Exponent(0)), USDCentPrice(1 * u64::pow(10, 2))), + ( + (Price(12), Exponent(-1)), + USDCentPrice(12 * u64::pow(10, 1)), + ), + ((Price(454000), Exponent(-6)), USDCentPrice(45)), + ] + .iter() + .for_each(|&(price, expected_usd_cent)| { + assert_eq!(get_usd_cent_price(price), expected_usd_cent); + }); + } +} diff --git a/price-feed/src/main.rs b/price-feed/src/main.rs index 1e991dc1db9..35c6f96d080 100644 --- a/price-feed/src/main.rs +++ b/price-feed/src/main.rs @@ -1,86 +1,31 @@ mod asset; +mod backend; +mod cache; mod feed; +mod frontend; mod opts; #[macro_use] extern crate custom_derive; #[macro_use] extern crate enum_derive; +#[macro_use] +extern crate lazy_static; + use crate::{ - asset::{Asset, AssetPair}, - feed::{pyth::Pyth, FeedNotification, Price, TimeStamped}, + asset::Asset, + backend::{Backend, FeedNotificationAction}, + cache::ThreadSafePriceCache, + feed::{pyth, FeedNotification, TimeStampedPrice}, + frontend::Frontend, }; use futures::stream::StreamExt; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; use std::{ collections::HashMap, - net::SocketAddr, - str::FromStr, sync::{Arc, RwLock}, }; -use tokio::{ - sync::{mpsc, oneshot}, - task::JoinHandle, -}; -use url::Url; -use warp::{ - hyper::StatusCode, - reply::{json, with_status}, - Filter, -}; - -async fn run_http_frontend( - listening_address: &String, - prices: Arc>>>, -) -> (oneshot::Sender<()>, JoinHandle<()>) { - let get_price = warp::path!("price" / Asset / Asset) - .and(warp::get()) - .map(move |x, y| { - let asset_pair = (x, y); - match prices.read().unwrap().get(&asset_pair) { - Some(latest_price) => with_status(json(latest_price), StatusCode::OK), - // TODO: how to 404 without content??? - None => with_status(json(&"NOT_FOUND"), StatusCode::NOT_FOUND), - } - }); - - // Channel that will allow warp to gracefully shutdown when a signal is comming. - let (tx, rx) = oneshot::channel::<()>(); - let (_, server) = warp::serve(get_price).bind_with_graceful_shutdown( - SocketAddr::from_str(listening_address).expect("invalid listening address."), - async { - rx.await.ok(); - }, - ); - - // Allow us to join the shutdown later. - let server_handle = tokio::spawn(server); - - (tx, server_handle) -} - -// TODO: manage multiple feeds -async fn create_subscriptions(pythd_host: &String) -> (Pyth, mpsc::Receiver) { - /* Its important to drop the initial feed_in as it will be cloned for all subsequent tasks - The received won't get notified if all cloned senders are closed but not the 'main' one. - */ - let (feed_in, feed_out) = mpsc::channel::(128); - - let mut pyth = Pyth::new(&Url::parse(&pythd_host).expect("invalid pythd host address.")) - .await - .expect("connection to pyth-client failed"); - - // TODO: subscribe to all asset prices? cli configurable? - log::info!("successfully connected to pyth-client."); - for &asset in [Asset::BTC, Asset::ETH, Asset::LTC].iter() { - pyth.subscribe_to_asset(&feed_in, (asset, Asset::USD)) - .await - .expect("failed to subscribe to asset"); - } - - (pyth, feed_out) -} #[tokio::main] async fn main() { @@ -88,57 +33,45 @@ async fn main() { let opts = opts::get_opts(); - let (pyth, mut feed_out) = create_subscriptions(&opts.pythd_host).await; + let prices_cache: ThreadSafePriceCache = Arc::new(RwLock::new(HashMap::new())); - let prices: Arc>>> = - Arc::new(RwLock::new(HashMap::new())); + let (pyth, pyth_feed) = pyth::run_full_subscriptions(&opts.pythd_host).await; - let (server_shutdown, server_handle) = - run_http_frontend(&opts.listening_address, prices.clone()).await; + let backend_shutdown_trigger: futures::stream::Fuse = + Signals::new(&[SIGTERM, SIGINT, SIGQUIT]) + .expect("could not create signals stream") + .fuse(); - let mut signals = Signals::new(&[SIGTERM, SIGINT, SIGQUIT]) - .expect("could not create signals stream") - .fuse(); + let backend = Backend::new::< + FeedNotification, + FeedNotificationAction, + _, + _, + _, + >(prices_cache.clone(), pyth_feed, backend_shutdown_trigger) + .await; - let terminate = async { - log::info!("terminating pyth subscriptions..."); - pyth.terminate().await; - log::info!("signaling warp for termination..."); - server_shutdown.send(()).unwrap(); - log::info!("waiting for warp to terminate..."); - server_handle.await.unwrap(); - }; + let frontend = Frontend::new(&opts.listening_address, prices_cache).await; - 'a: loop { - tokio::select! { - _ = signals.next() => { - log::info!("terminating signal received."); - terminate.await; - break 'a; - } - message = feed_out.recv() => { - match message { - Some(notification) => { - log::info!("notification received: {:?}", notification); - /* TODO: how are we going to handles X feeds: - - do we just expose every one of them from their own endpoint? - - do we merge the prices (median?), if so, merging will depend on timestamp? - On notification close, do we remove the price as we are no - longer getting new prices? - */ - if let FeedNotification::PriceUpdated(_, a, p) = notification { - prices.write().expect("failed to acquire write lock...").insert(a, p); - }; - } - None => { - log::info!("no more feed available... exiting handler."); - terminate.await; - break 'a; - } - } - } - } - } + backend + .shutdown_handle + .await + .expect("oop, something went wrong"); + + log::info!("backend terminated, dropping pyth subscriptions"); + pyth.terminate().await; + + log::info!("signaling warp for termination..."); + frontend + .shutdown_trigger + .send(()) + .expect("oop, something went wrong"); + + log::info!("waiting for warp to terminate..."); + frontend + .shutdown_handle + .await + .expect("oop, something went wrong"); log::info!("farewell."); } diff --git a/price-feed/src/opts.rs b/price-feed/src/opts.rs index e53c74e293f..1d5373c6a70 100644 --- a/price-feed/src/opts.rs +++ b/price-feed/src/opts.rs @@ -6,7 +6,7 @@ use clap::{AppSettings, Clap}; pub struct Opts { #[clap(short, long, default_value = "http://127.0.0.1:8910")] pub pythd_host: String, - #[clap(short, long, default_value = "127.0.0.1:8081")] + #[clap(short, long, default_value = "127.0.0.1:3001")] pub listening_address: String }