Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,741 changes: 1,298 additions & 443 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ panic = 'unwind'
members = [
'node',
'runtime',
'price-feed',
]
23 changes: 23 additions & 0 deletions price-feed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "price-feed"
version = "0.1.0"
edition = "2018"

[dependencies]
enum_derive = "0.1.7"
custom_derive = "0.1.7"
env_logger = "0.9.0"
log = "0.4.14"
tokio = { version = "1.9", features = [ "full" ] }
url = "1.7.0"
chrono = "0.4.19"
serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = "18.0.0"
jsonrpc-client-transports = "18.0.0"
clap = "3.0.0-beta.2"
warp = "0.3"
signal-hook = "0.3"
signal-hook-tokio = { version = "0.3", features = [ "futures-v0_3" ] }
futures = "0.3"
47 changes: 47 additions & 0 deletions price-feed/run_pyth.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
with import <nixpkgs> {};
let
pythd = stdenv.mkDerivation rec {
name = "pyth-daemon-${version}";
version = "2.1";
buildInputs = [ cmake zlib.dev libudev openssl.dev zstd.dev ];
src = pkgs.fetchFromGitHub {
repo = "pyth-client";
owner = "hussein-aitlahcen";
rev = "update-jsonrpc";
sha256 = "sha256:1ca8z33pnn6x9dkxii70s1lcskh56fzng1x9lqxzk84q5fffysdb";
};
configurePhase = ''
mkdir build
cd build
cmake ..
'';
buildPhase = ''
make
'';
installPhase = ''
mkdir -p $out/bin
mv pythd $out/bin
mv pyth_tx $out/bin
'';
};
in mkShell {
packages = [ pythd ];
SOLANA_ENV = "devnet";
shellHook = ''
echo "Running up pyth_tx & puthd"
export PYTH_TX_LOG=$(mktemp)
pyth_tx -l $PYTH_TX_LOG -d -r api.$SOLANA_ENV.solana.com &
export PYTH_TX_PID=$!
export PYTHD_LOG=$(mktemp)
pythd -l $PYTHD_LOG -d -r api.$SOLANA_ENV.solana.com &
export PYTHD_PID=$!
teardown() {
echo "Shuting down pyth_tx & pythd";
kill -2 $PYTHD_PID
kill -2 $PYTH_TX_PID
rm $PYTH_TX_LOG
rm $PYTHD_LOG
}
trap teardown EXIT
'';
}
15 changes: 15 additions & 0 deletions price-feed/src/asset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
custom_derive! {
#[derive(EnumFromStr, Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub enum Asset {
BTC,
ETH,
LTC,
USD,
}
}

pub type AssetPair = (Asset, Asset);

pub fn symbol((x, y): AssetPair) -> String {
format!("{:?}/{:?}", x, y)
}
29 changes: 29 additions & 0 deletions price-feed/src/feed/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
pub mod pyth;
use serde::Serialize;

use crate::asset::AssetPair;

#[derive(Serialize, Copy, Clone, Debug)]
pub struct TimeStamp(i64);

#[derive(Serialize, Copy, Clone, Debug)]
#[repr(transparent)]
pub struct Price(u64);

#[derive(Serialize, Copy, Clone, Debug)]
pub struct TimeStamped<T> {
pub value: T,
pub timestamp: TimeStamp,
}

#[derive(PartialEq, Eq, Hash, Debug)]
pub enum Feed {
Pyth,
}

#[derive(Debug)]
pub enum FeedNotification {
Opened(Feed, AssetPair),
Closed(Feed, AssetPair),
PriceUpdated(Feed, AssetPair, TimeStamped<Price>),
}
165 changes: 165 additions & 0 deletions price-feed/src/feed/pyth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use super::{Feed, FeedNotification, Price, TimeStamped};
use crate::{
asset::{symbol, AssetPair},
feed::TimeStamp,
};
use chrono::Utc;
use jsonrpc_client_transports::{
transports::ws::connect, RpcError, TypedClient, TypedSubscriptionStream,
};
use jsonrpc_core_client::futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::{
sync::mpsc::{self, error::SendError},
task::JoinHandle,
};

#[derive(Debug, Deserialize)]
struct PythNotification {
price: u64,
}

#[derive(Clone, Debug, Deserialize)]
struct PythProductPrice {
account: String,
}

#[derive(Clone, Debug, Deserialize)]
struct PythProductAttributes {
symbol: String,
}

#[derive(Clone, Debug, Deserialize)]
struct PythProduct {
account: String,
attr_dict: PythProductAttributes,
price: Vec<PythProductPrice>,
}

#[derive(Debug)]
pub enum PythError {
RpcError(RpcError),
ChannelError(SendError<FeedNotification>),
}

#[derive(Serialize)]
struct PythSubscribeParams {
account: String,
}

pub struct Pyth {
client: TypedClient,
handles: Vec<JoinHandle<Result<(), PythError>>>,
}

impl Pyth {
pub async fn new(url: &url::Url) -> Result<Pyth, PythError> {
let client = connect::<TypedClient>(url)
.await
.map_err(|e| PythError::RpcError(e))?;
Ok(Pyth {
client,
handles: Vec::new(),
})
}

async fn get_product_list(&self) -> Result<Vec<PythProduct>, PythError> {
let result = self
.client
.call_method::<(), Vec<PythProduct>>("get_product_list", "", ())
.await
.map_err(|e| PythError::RpcError(e))?;
log::info!("Products: {:?}", result);
Ok(result)
}

async fn subscribe(
&mut self,
output: mpsc::Sender<FeedNotification>,
asset_pair: AssetPair,
account: String,
) -> Result<(), PythError> {
log::info!(
"Subscribing to asset pair {:?} from account {:?}",
asset_pair,
account
);
let mut stream: TypedSubscriptionStream<PythNotification> = self
.client
.subscribe(
"subscribe_price",
[PythSubscribeParams {
account: account.to_string(),
}],
"notify_price",
"",
"",
)
.map_err(|e| PythError::RpcError(e))?;
let join_handle = tokio::spawn(async move {
output
.send(FeedNotification::Opened(Feed::Pyth, asset_pair))
.await
.map_err(|e| PythError::ChannelError(e))?;
'a: loop {
match stream.next().await {
Some(notification) => match notification {
Ok(price_notification) => {
log::info!(
"received price, {:?} = {:?}",
asset_pair,
price_notification
);
output
.send(FeedNotification::PriceUpdated(
Feed::Pyth,
asset_pair,
TimeStamped {
value: Price(price_notification.price),
timestamp: TimeStamp(Utc::now().timestamp()),
},
))
.await
.map_err(|e| PythError::ChannelError(e))?;
}
_ => {
log::warn!("invalid notification?: {:?}", notification);
}
},
None => break 'a,
}
}
output
.send(FeedNotification::Closed(Feed::Pyth, asset_pair))
.await
.map_err(|e| PythError::ChannelError(e))?;
Ok(())
});
self.handles.push(join_handle);
Ok(())
}

pub async fn subscribe_to_asset(
&mut self,
output: &mpsc::Sender<FeedNotification>,
asset_pair: AssetPair,
) -> Result<(), PythError> {
let asset_pair_symbol = symbol(asset_pair);
let products = self.get_product_list().await?;
let price_accounts = products
.iter()
.filter(|p| p.attr_dict.symbol == asset_pair_symbol)
.flat_map(|p| p.price.clone())
.map(|p| p.account.clone())
.collect::<Vec<_>>();
log::info!("Accounts for {:?}: {:?}", asset_pair_symbol, price_accounts);
for account in price_accounts {
self.subscribe(output.clone(), asset_pair, account).await?
}
Ok(())
}

pub async fn terminate(&self) {
self.handles.iter().for_each(drop);
}
}
Loading