Skip to content

Commit

Permalink
v7-dev (#120)
Browse files Browse the repository at this point in the history
* add timeout on requests from sidecar to engine

* generate increment id inside orderbook to replace mysql

* remove external order id in commands

* add PendingOrders into core data so we have to migrate it to v2

* sidecar: receive order updates from galois directly

---------

Co-authored-by: Cyberaurora <[email protected]>
  • Loading branch information
kb1ns and kb1ns authored Nov 14, 2023
1 parent c1dd0cf commit bfad0f9
Show file tree
Hide file tree
Showing 23 changed files with 590 additions and 493 deletions.
14 changes: 7 additions & 7 deletions bin/src/galois.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn print_banner() {
/// Overview:
///
/// sidecar chain <-+
/// | | \
/// ^ | \
/// | | \
/// v v \
/// +----> server scanner +
Expand All @@ -48,11 +48,11 @@ fn print_banner() {
/// | | |
/// | | |
/// | v |
/// + storage |
/// \ / \ +
/// \ / \ /
/// \ / \ /
/// +-- replyer committer -+
/// + rocksdb +
/// \ / \ /
/// \ / \ /
/// \ / \ /
/// +-- market committer -+
///
fn start() {
let (id, coredump) = snapshot::load().unwrap();
Expand All @@ -62,7 +62,7 @@ fn start() {
let (event_tx, event_rx) = std::sync::mpsc::channel();
let (input_tx, input_rx) = std::sync::mpsc::channel();
let (reply_tx, reply_rx) = std::sync::mpsc::channel();
output::init(output_rx);
market::init(output_rx, reply_tx.clone());
committer::init(connector.clone(), state.clone());
executor::init(event_rx, output_tx, reply_tx.clone(), coredump);
sequencer::init(input_rx, event_tx, reply_tx, id);
Expand Down
3 changes: 3 additions & 0 deletions engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ license = "Apache-2.0"
repository = "https://github.com/uinb/galois"
description = "High performance matching system"

[features]
v1-to-v2 = []

[dependencies]
rust_decimal = { version = "1.22", features = ["serde-bincode"] }
bincode = "1.3.1"
Expand Down
16 changes: 12 additions & 4 deletions engine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,20 @@ pub struct ServerConfig {
}

impl ServerConfig {
pub fn get_coredump_path(&self) -> String {
format!("{}/coredump/", self.data_home)
pub fn get_checkpoint_path(&self) -> String {
format!("{}/checkpoint/", self.data_home)
}

pub fn get_storage_path(&self) -> String {
format!("{}/storage/", self.data_home)
pub fn get_sequence_path(&self) -> String {
format!("{}/sequence/", self.data_home)
}

pub fn get_proof_path(&self) -> String {
format!("{}/proof/", self.data_home)
}

pub fn get_output_path(&self) -> String {
format!("{}/market/", self.data_home)
}
}

Expand Down
89 changes: 68 additions & 21 deletions engine/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub use crate::{
fusotao::GlobalStates,
input::InOrOut,
matcher::{Role, State as OrderState},
orderbook::AskOrBid,
orderbook::OrderBook,
orderbook::{AskOrBid, OrderBook},
orders::{PendingOrder, UserOrders},
};
use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression};
use indexmap::IndexSet;
Expand All @@ -32,7 +32,9 @@ use std::{
};

lazy_static::lazy_static! {
pub static ref STORAGE: rocksdb::DB = rocksdb::DB::open_default(&crate::C.server.get_storage_path()).unwrap();
pub static ref SEQ_STORE: rocksdb::DB = rocksdb::DB::open_default(&crate::C.server.get_sequence_path()).unwrap();
pub static ref PROOF_STORE: rocksdb::DB = rocksdb::DB::open_default(&crate::C.server.get_proof_path()).unwrap();
pub static ref OUTPUT_STORE: rocksdb::DB = rocksdb::DB::open_default(&crate::C.server.get_output_path()).unwrap();
}

pub type Base = u32;
Expand Down Expand Up @@ -161,36 +163,20 @@ pub fn max_number() -> Amount {
u64::MAX.into()
}

pub const MAX_PENDING_ORDERS_PER_USER: usize = 100;

// we only keep the last 1000 transfer_in/out receipts to remove duplicates
const RECEIPTS_RECORDS_CAPACITY: usize = 1000;
const MAX_OPEN_ORDERS_PER_SYMBOL: usize = 100;

#[derive(Clone, Debug)]
pub struct PendingOrder {
order_id: u64,
symbol: Symbol,
direction: u8,
create_timestamp: u64,
amount: String,
price: String,
status: u8,
matched_quote_amount: String,
matched_base_amount: String,
base_fee: Decimal,
quote_fee: Decimal,
}

#[derive(Clone, Debug)]
pub struct Ephemeral {
onchain_receipt_records: IndexSet<(u32, UserId)>,
user_pending_orders: HashMap<(Symbol, UserId), HashMap<OrderId, PendingOrder>>,
}

impl Ephemeral {
pub fn new() -> Self {
Self {
onchain_receipt_records: IndexSet::with_capacity(RECEIPTS_RECORDS_CAPACITY),
user_pending_orders: HashMap::new(),
}
}

Expand All @@ -209,6 +195,7 @@ pub struct Data {
pub merkle_tree: GlobalStates,
pub current_event_id: u64,
pub tvl: Amount,
pub orders: UserOrders,
}

impl Data {
Expand All @@ -219,6 +206,7 @@ impl Data {
merkle_tree: GlobalStates::default(),
current_event_id: 0,
tvl: Amount::zero(),
orders: UserOrders::new(),
}
}

Expand All @@ -236,6 +224,65 @@ impl Data {
}
}

#[cfg(feature = "v1-to-v2")]
pub mod v1 {
use super::*;

#[derive(Clone, Serialize, Deserialize)]
pub struct DataV1 {
pub orderbooks: HashMap<Symbol, OrderBook>,
pub accounts: Accounts,
pub merkle_tree: GlobalStates,
pub current_event_id: u64,
pub tvl: Amount,
}

impl DataV1 {
pub fn new() -> Self {
Self {
orderbooks: HashMap::new(),
accounts: HashMap::new(),
merkle_tree: GlobalStates::default(),
current_event_id: 0,
tvl: Amount::zero(),
}
}

pub fn from_raw(file: File) -> anyhow::Result<Self> {
let reader = BufReader::new(file);
let mut decompress = ZlibDecoder::new(reader);
Ok(bincode::deserialize_from(&mut decompress)?)
}

pub fn into_raw(&self, file: File) -> anyhow::Result<()> {
let writer = BufWriter::new(file);
let mut compress = ZlibEncoder::new(writer, Compression::best());
bincode::serialize_into(&mut compress, &self)?;
Ok(())
}
}

impl TryInto<Data> for (DataV1, Vec<PendingOrder>) {
type Error = anyhow::Error;

fn try_into(self) -> Result<Data, Self::Error> {
let (data, pending_orders) = self;
let mut orders = UserOrders::new();
pending_orders.into_iter().for_each(|order| {
orders.insert(order);
});
Ok(Data {
orderbooks: data.orderbooks,
accounts: data.accounts,
merkle_tree: data.merkle_tree,
current_event_id: data.current_event_id,
tvl: data.tvl,
orders,
})
}
}
}

#[test]
pub fn test_dump() {
use crate::orderbook::Order;
Expand Down
7 changes: 0 additions & 7 deletions engine/src/executor/clearing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ pub mod test {
assert_eq!(dec!(0.5), b2_101.available);
let mr = Match {
maker: vec![],
// TODO tag here
// taker: Taker::taker_placed(
taker: Taker::cancel(
UserId::from_low_u64_be(2),
Expand Down Expand Up @@ -640,7 +639,6 @@ pub mod test {
execute_limit(
&mut book,
UserId::from_low_u64_be(1),
1,
price,
amount,
AskOrBid::Ask,
Expand All @@ -661,7 +659,6 @@ pub mod test {
let mr = execute_limit(
&mut book,
UserId::from_low_u64_be(1),
2,
price,
amount,
AskOrBid::Bid,
Expand Down Expand Up @@ -718,7 +715,6 @@ pub mod test {
execute_limit(
&mut book,
UserId::from_low_u64_be(1),
1,
price,
amount,
AskOrBid::Ask,
Expand All @@ -736,7 +732,6 @@ pub mod test {
let mr = execute_limit(
&mut book,
UserId::from_low_u64_be(2),
2,
price,
amount,
AskOrBid::Bid,
Expand Down Expand Up @@ -809,7 +804,6 @@ pub mod test {
execute_limit(
&mut book,
UserId::from_low_u64_be(2),
1,
price,
amount,
AskOrBid::Bid,
Expand All @@ -821,7 +815,6 @@ pub mod test {
let mr = execute_limit(
&mut book,
UserId::from_low_u64_be(1),
2,
price,
amount,
AskOrBid::Ask,
Expand Down
Loading

0 comments on commit bfad0f9

Please sign in to comment.