Skip to content

Commit

Permalink
Merge pull request #11 from blockworks-foundation/max/fix-crank
Browse files Browse the repository at this point in the history
upgrade mango-feeds-connector after fixing account write filter bug
  • Loading branch information
godmodegalactus authored Apr 6, 2023
2 parents 6d96d6d + 326d167 commit 5bea723
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 430 deletions.
492 changes: 182 additions & 310 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 16 additions & 28 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,32 @@ license = "Apache-2.0"
homepage = "mango.markets"
publish = false

rust-version = "1.66.1"

[dependencies]
anyhow = "1.0"
arrayref = "*"
async-std = "1.12.0"
async-channel = "1.6"
async-trait = "0.1.66"
borsh = "0.9.3"
bytemuck = "1.7.2"
chrono = "0.4.19"
clap = "2.33.1"
csv-async = "1.2"
dashmap = "5.4.0"
fixed = { version = ">=1.11.0, <1.12.0", features = ["serde"] }
fixed-macro = "^1.1.1"
multiqueue = "^0.3.2"
futures = "0.3.17"
iter_tools = "0.1.4"
log = "0.4.14"
multiqueue = "^0.3.2"
rand = ">=0.8.5"
rayon = "1.5.1"
serde = "1.0.136"
serde_derive = "1.0.103"
serde_json = "1.0.79"
serde_yaml = "0.8.23"
iter_tools = "0.1.4"
dashmap = "5.4.0"

mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }
yellowstone-grpc-proto = "1.0.1"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }

solana-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-core = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
Expand All @@ -52,25 +55,10 @@ solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", ta
# we have a bunch of helpers to convert between the two explicitly
solana-program = "1.9.17"

thiserror = "1.0"
csv-async = "1.2"
async-std = "1.12.0"
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
async-channel = "1.6"
async-trait = "0.1.66"
prost = "0.11"
warp = "0.3"
futures = "0.3.17"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
arrayref = "*"
bytemuck = "1.7.2"
toml = "*"
mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", branch = "ckamm/solana-versions2", default-features = false, features = ["solana-1-15"] }

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

[patch.crates-io]
# for gzip encoded responses
jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" }
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,43 @@ For the best results to avoid limits by quic it is better to fill the argument "

## Build

Clone repo
Install configure-mango
```sh
https://github.com/blockworks-foundation/mango-simulation.git
git clone https://github.com/godmodegalactus/configure_mango.git
cd configure_mango
yarn install
sh scripts/configure_local.sh

# open a new terminal as the previous one will continue running a solana validator
# this command will hang for around a minute, just wait for it to finish
NB_USERS=50 yarn ts-node index.ts

```

Build
Install mango-simulation
```sh
git clone https://github.com/blockworks-foundation/mango-simulation.git
cd mango-simulation
cargo build

# copy over files from configure_mango while you wait for the build to finish
mkdir -p localnet
cp ../configure_mango/ids.json localnet
cp ../configure_mango/accounts.json localnet
cp ../configure_mango/authority.json localnet
cp ../configure_mango/config/validator-identity.json localnet
```

## Run


To run against your local validator:
```sh
cargo run --bin mango-simulation -- -u http://localhost:8899 --identity validator-identity.json --keeper-authority authority.json --accounts accounts.json --mango ids.json --mango-cluster localnet --duration 10 -q 2 --transaction_save_file tlog.csv --block_data_save_file blog.csv
cargo run --bin mango-simulation -- -u http://127.0.0.1:8899 --identity localnet/validator-identity.json --keeper-authority localnet/authority.json --accounts localnet/accounts.json --mango localnet/ids.json --mango-cluster localnet --duration 10 -q 2 --transaction-save-file tlog.csv --block-data-save-file blog.csv
```

You can also run the simulation against testnet, but you will need to run configure_mango

Details for each argument:
```
USAGE:
Expand Down
91 changes: 45 additions & 46 deletions src/confirmation_strategies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use solana_transaction_status::{

use crate::states::{BlockData, TransactionConfirmRecord, TransactionSendRecord};

use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant, sync::broadcast::Sender};
use tokio::{
sync::broadcast::Sender, sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant,
};

pub async fn process_blocks(
block: &UiConfirmedBlock,
Expand Down Expand Up @@ -73,33 +75,31 @@ pub async fn process_blocks(
let transaction_record = transaction_record.0;
mm_transaction_count += 1;

match tx_confirm_records
.send(TransactionConfirmRecord {
signature: transaction_record.signature.to_string(),
confirmed_slot: Some(slot),
confirmed_at: Some(Utc::now().to_string()),
sent_at: transaction_record.sent_at.to_string(),
sent_slot: transaction_record.sent_slot,
successful: if let Some(meta) = &meta {
meta.status.is_ok()
} else {
false
},
error: if let Some(meta) = &meta {
meta.err.as_ref().map(|x| x.to_string())
} else {
None
},
block_hash: Some(block.blockhash.clone()),
market: transaction_record.market.map(|x| x.to_string()),
market_maker: transaction_record.market_maker.map(|x| x.to_string()),
keeper_instruction: transaction_record.keeper_instruction,
slot_processed: Some(slot),
slot_leader: Some(slot_leader.clone()),
timed_out: false,
priority_fees: transaction_record.priority_fees,
})
{
match tx_confirm_records.send(TransactionConfirmRecord {
signature: transaction_record.signature.to_string(),
confirmed_slot: Some(slot),
confirmed_at: Some(Utc::now().to_string()),
sent_at: transaction_record.sent_at.to_string(),
sent_slot: transaction_record.sent_slot,
successful: if let Some(meta) = &meta {
meta.status.is_ok()
} else {
false
},
error: if let Some(meta) = &meta {
meta.err.as_ref().map(|x| x.to_string())
} else {
None
},
block_hash: Some(block.blockhash.clone()),
market: transaction_record.market.map(|x| x.to_string()),
market_maker: transaction_record.market_maker.map(|x| x.to_string()),
keeper_instruction: transaction_record.keeper_instruction,
slot_processed: Some(slot),
slot_leader: Some(slot_leader.clone()),
timed_out: false,
priority_fees: transaction_record.priority_fees,
}) {
Ok(_) => {}
Err(e) => {
warn!("Tx confirm record channel broken {}", e.to_string());
Expand Down Expand Up @@ -191,24 +191,23 @@ pub fn confirmations_by_blocks(

// add to timeout if not retaining
if remove {
let _ = tx_confirm_records
.send(TransactionConfirmRecord {
signature: signature.to_string(),
confirmed_slot: None,
confirmed_at: None,
sent_at: sent_record.sent_at.to_string(),
sent_slot: sent_record.sent_slot,
successful: false,
error: Some("timeout".to_string()),
block_hash: None,
market: sent_record.market.map(|x| x.to_string()),
market_maker: sent_record.market_maker.map(|x| x.to_string()),
keeper_instruction: sent_record.keeper_instruction.clone(),
slot_processed: None,
slot_leader: None,
timed_out: true,
priority_fees: sent_record.priority_fees,
});
let _ = tx_confirm_records.send(TransactionConfirmRecord {
signature: signature.to_string(),
confirmed_slot: None,
confirmed_at: None,
sent_at: sent_record.sent_at.to_string(),
sent_slot: sent_record.sent_slot,
successful: false,
error: Some("timeout".to_string()),
block_hash: None,
market: sent_record.market.map(|x| x.to_string()),
market_maker: sent_record.market_maker.map(|x| x.to_string()),
keeper_instruction: sent_record.keeper_instruction.clone(),
slot_processed: None,
slot_leader: None,
timed_out: true,
priority_fees: sent_record.priority_fees,
});
to_remove.push(signature.clone());
}
}
Expand Down
23 changes: 14 additions & 9 deletions src/crank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use crate::{

use mango_feeds_connector::{
account_write_filter::{self, AccountWriteRoute},
FilterConfig,
websocket_source,
metrics, SourceConfig,
SnapshotSourceConfig, MetricsConfig,
metrics, websocket_source, FilterConfig, MetricsConfig, SnapshotSourceConfig, SourceConfig,
};

use async_channel::unbounded;
Expand All @@ -31,16 +28,13 @@ use std::{
};
use tokio::sync::RwLock;



#[derive(Debug, Clone)]
pub struct KeeperConfig {
pub program_id: Pubkey,
pub rpc_url: String,
pub websocket_url: String,
}


pub fn start(
config: KeeperConfig,
exit_signal: Arc<AtomicBool>,
Expand Down Expand Up @@ -123,7 +117,11 @@ pub fn start(
let routes = vec![AccountWriteRoute {
matched_pubkeys: perp_queue_pks
.iter()
.map(|(_, evq_pk)| mango_feeds_connector::solana_sdk::pubkey::Pubkey::new_from_array(evq_pk.to_bytes()))
.map(|(_, evq_pk)| {
mango_feeds_connector::solana_sdk::pubkey::Pubkey::new_from_array(
evq_pk.to_bytes(),
)
})
.collect(),
sink: Arc::new(MangoV3PerpCrankSink::new(
perp_queue_pks,
Expand All @@ -135,10 +133,12 @@ pub fn start(
timeout_interval: Duration::default(),
}];

info!("matched_pks={:?}", routes[0].matched_pubkeys);

let (account_write_queue_sender, slot_queue_sender) =
account_write_filter::init(routes, metrics_tx.clone()).expect("filter initializes");

info!("start processing grpc events");
// info!("start processing grpc events");

// grpc_plugin_source::process_events(
// &config,
Expand All @@ -148,6 +148,11 @@ pub fn start(
// metrics_tx.clone(),
// ).await;

info!(
"start processing websocket events program_id={:?} ws_url={:?}",
config.program_id, config.websocket_url
);

websocket_source::process_events(
&SourceConfig {
dedup_queue_size: 0,
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

pub mod cli;
pub mod confirmation_strategies;
pub mod crank;
Expand Down
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
log::info,
serde_json,
mango_simulation::{
cli,
confirmation_strategies::confirmations_by_blocks,
Expand All @@ -17,6 +16,7 @@ use {
stats::MangoSimulationStats,
tpu_manager::TpuManager,
},
serde_json,
solana_client::{nonblocking::rpc_client::RpcClient as NbRpcClient, rpc_client::RpcClient},
solana_program::pubkey::Pubkey,
solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair},
Expand Down Expand Up @@ -215,8 +215,7 @@ pub async fn main() -> anyhow::Result<()> {
);
tasks.append(&mut writers_jh);

let stats_handle =
mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe(), exit_signal.clone());
let stats_handle = mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe());
tasks.push(stats_handle);

let mut confirmation_threads = confirmations_by_blocks(
Expand Down
18 changes: 8 additions & 10 deletions src/mango_v3_perp_crank_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,9 @@ use solana_sdk::{instruction::Instruction, pubkey::Pubkey};

use bytemuck::cast_ref;

use mango_feeds_connector::{
account_write_filter::AccountWriteSink,
chain_data::AccountData,
};
use mango_feeds_connector::{account_write_filter::AccountWriteSink, chain_data::AccountData};

use crate::{
helpers::{to_sdk_instruction, to_sp_pk},
};
use crate::helpers::{to_sdk_instruction, to_sp_pk};

const MAX_BACKLOG: usize = 2;
const MAX_EVENTS_PER_TX: usize = 10;
Expand Down Expand Up @@ -61,7 +56,11 @@ type EventQueueEvents = [AnyEvent; QUEUE_LEN];

#[async_trait]
impl AccountWriteSink for MangoV3PerpCrankSink {
async fn process(&self, pk: &mango_feeds_connector::solana_sdk::pubkey::Pubkey, account: &AccountData) -> Result<(), String> {
async fn process(
&self,
pk: &mango_feeds_connector::solana_sdk::pubkey::Pubkey,
account: &AccountData,
) -> Result<(), String> {
let account = &account.account;

let (ix, mkt_pk): (Result<Instruction, String>, Pubkey) = {
Expand Down Expand Up @@ -112,7 +111,6 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
)
.collect();


let pk = solana_sdk::pubkey::Pubkey::new_from_array(pk.to_bytes());
let mkt_pk = self
.mkt_pks_by_evq_pks
Expand All @@ -125,7 +123,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
&to_sp_pk(&self.group_pk),
&to_sp_pk(&self.cache_pk),
&to_sp_pk(mkt_pk),
&to_sp_pk(&pk),
&to_sp_pk(&pk),
&mut mango_accounts,
MAX_EVENTS_PER_TX,
)
Expand Down
Loading

0 comments on commit 5bea723

Please sign in to comment.