Skip to content

Commit

Permalink
Add failover RPC support.
Browse files Browse the repository at this point in the history
* Remove the `rpc_host` and `rpc_port` fields from the configuration file
  format and add `primary_rpc_host`, `primary_rpc_port`, `failover_rpc_host`,
  and `failover_rpc_port`.
* Add the `RpcUrl` and `RpcUrlKind` types.
  * `RpcUrl` contains a host name and a port.
  * `RpcUrlKind` has variants for primary and failover urls and contains a
    `RpcUrl`.
* Remove `rpc_host` and `rpc_port` and add primary and failover RPC url fields
  to `config::Config` et. al.
* Update `app::Connections::new_http` to automatically use the failover url in
  the event of an error connecting to the primary.
* Add the `home_url` and `foreign_url` to `Connections` containing the
  url/kind (a `RpcUrlKind`) currently in use.
* Update logging.
* Update tests.
  • Loading branch information
c0gent committed Jun 19, 2018
1 parent 69474ce commit 9cc6814
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 91 deletions.
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ and they can convert them back as well.

![withdraw](./res/withdraw.png)

### How to build
### How to build

Requires `rust` and `cargo`: [installation instructions.](https://www.rust-lang.org/en-US/install.html)

Expand Down Expand Up @@ -83,16 +83,20 @@ keystore = "/path/to/keystore"

[home]
account = "0x006e27b6a72e1f34c626762f3c4761547aff1421"
rpc_host = "http://localhost"
rpc_port = 8545
primary_rpc_host = "http://localhost"
primary_rpc_port = 8545
failover_rpc_host = "http://localhost"
failover_rpc_port = 8546
required_confirmations = 0
password = "home_password.txt"
default_gas_price = 1_000_000_000 # 1 GWEI

[foreign]
account = "0x006e27b6a72e1f34c626762f3c4761547aff1421"
rpc_host = "http://localhost"
rpc_port = 9545
primary_rpc_host = "http://localhost"
primary_rpc_port = 9545
failover_rpc_host = "http://localhost"
failover_rpc_port = 9546
required_confirmations = 0
gas_price_oracle_url = "https://gasprice.poa.network"
gas_price_speed = "instant"
Expand All @@ -110,13 +114,15 @@ withdraw_confirm = { gas = 3000000 }

#### Options

- `keystore` - path to a keystore directory with JSON keys
- `keystore` - path to a keystore directory with JSON keys

#### home/foreign options

- `home/foreign.account` - authority address on the home (**required**)
- `home/foreign.rpc_host` - RPC host (**required**)
- `home/foreign.rpc_port` - RPC port (**defaults to 8545**)
- `home/foreign.primary_rpc_host` - Primary RPC host (**required**)
- `home/foreign.primary_rpc_port` - Primary RPC port (**defaults to 8545**)
- `home/foreign.failover_rpc_host` - Failover RPC host used in the event the primary RPC host is not available. Must be specified if `failover_rpc_port` is set.
- `home/foreign.failover_rpc_port` - Failover RPC port used in the event the primary RPC host is not available. (**defaults to 8545** if `failover_rpc_host` is set.)
- `home/foreign.required_confirmations` - number of confirmation required to consider transaction final on home (default: **12**)
- `home/foreign.poll_interval` - specify how often home node should be polled for changes (in seconds, default: **1**)
- `home/foreign.request_timeout` - specify request timeout (in seconds, default: **3600**)
Expand Down
93 changes: 66 additions & 27 deletions bridge/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::path::{Path, PathBuf};
use tokio_core::reactor::{Handle};
use tokio_timer::{self, Timer};
use web3::Transport;
use error::{Error, ResultExt, ErrorKind};
use config::Config;
use error::{Error, ErrorKind};
use config::{Config, RpcUrl, RpcUrlKind};
use contracts::{home, foreign};
use web3::transports::http::Http;
use web3::{Transport, transports::http::Http, error::Error as Web3Error};
use std::time::Duration;

use std::sync::Arc;
Expand All @@ -27,54 +26,93 @@ pub struct App<T> where T: Transport {

pub struct Connections<T> where T: Transport {
pub home: T,
pub home_url: RpcUrlKind,
pub foreign: T,
pub foreign_url: RpcUrlKind,
}

impl Connections<Http> {
pub fn new_http(handle: &Handle, home: &str, home_concurrent_connections: usize, foreign: &str, foreign_concurrent_connections: usize) -> Result<Self, Error> {
/// Returns new home and foreign HTTP transport connections, falling back
/// to failover urls if necessary.
pub fn new_http(handle: &Handle, home_primary: &RpcUrl, home_failover: Option<&RpcUrl>,
home_concurrent_connections: usize, foreign_primary: &RpcUrl,
foreign_failover: Option<&RpcUrl>, foreign_concurrent_connections: usize)
-> Result<Self, Error> {
// Attempts to connect to either a primary or failover url, returning
// the transport and the url upon success.
fn connect(handle: &Handle, url_primary: &RpcUrl, url_failover: Option<&RpcUrl>,
concurrent_connections: usize) -> Result<(Http, RpcUrlKind), Web3Error> {
match Http::with_event_loop(&url_primary.to_string(), handle, concurrent_connections) {
Ok(t) => Ok((t, RpcUrlKind::Primary(url_primary.clone()))),
Err(err) => match url_failover {
Some(fo) => {
Http::with_event_loop(&fo.to_string(), handle, concurrent_connections)
.map(|h| (h, RpcUrlKind::Failover(fo.clone())))
},
None => Err(err),
},
}
}

let home = Http::with_event_loop(home, handle,home_concurrent_connections)
.map_err(ErrorKind::Web3)
.map_err(Error::from)
.chain_err(||"Cannot connect to home node rpc")?;
let foreign = Http::with_event_loop(foreign, handle, foreign_concurrent_connections)
.map_err(ErrorKind::Web3)
.map_err(Error::from)
.chain_err(||"Cannot connect to foreign node rpc")?;
let (home, home_url) = connect(handle, home_primary, home_failover, home_concurrent_connections)
.map_err(|err| ErrorKind::HomeRpcConnection(err))?;
let (foreign, foreign_url) = connect(handle, foreign_primary, foreign_failover, foreign_concurrent_connections)
.map_err(|err| ErrorKind::ForeignRpcConnection(err))?;

let result = Connections {
Ok(Connections {
home,
foreign
};
Ok(result)
home_url,
foreign,
foreign_url,
})
}
}

impl<T: Transport> Connections<T> {
pub fn as_ref(&self) -> Connections<&T> {
Connections {
/// Contains references to the fields of a `Connection`.
pub struct ConnectionsRef<'u, T> where T: Transport {
pub home: T,
pub home_url: &'u RpcUrlKind,
pub foreign: T,
pub foreign_url: &'u RpcUrlKind,
}

impl<'u, T: Transport> ConnectionsRef<'u, T> {
pub fn as_ref(&'u self) -> ConnectionsRef<'u, &T> {
ConnectionsRef {
home: &self.home,
home_url: &self.home_url,
foreign: &self.foreign,
foreign_url: &self.foreign_url,
}
}
}

impl App<Http> {
pub fn new_http<P: AsRef<Path>>(config: Config, database_path: P, handle: &Handle, running: Arc<AtomicBool>) -> Result<Self, Error> {
let home_url:String = format!("{}:{}", config.home.rpc_host, config.home.rpc_port);
let foreign_url:String = format!("{}:{}", config.foreign.rpc_host, config.foreign.rpc_port);
pub fn new_http<P: AsRef<Path>>(config: Config, database_path: P, handle: &Handle,
running: Arc<AtomicBool>) -> Result<Self, Error> {
let connections = Connections::new_http(
handle,
&config.home.primary_rpc,
config.home.failover_rpc.as_ref(),
config.home.concurrent_http_requests,
&config.foreign.primary_rpc,
config.foreign.failover_rpc.as_ref(),
config.foreign.concurrent_http_requests,
)?;

let connections = Connections::new_http(handle, home_url.as_ref(), config.home.concurrent_http_requests, foreign_url.as_ref(), config.foreign.concurrent_http_requests)?;
let keystore = EthStore::open(Box::new(RootDiskDirectory::at(&config.keystore))).map_err(|e| ErrorKind::KeyStore(e))?;
let keystore = EthStore::open(Box::new(RootDiskDirectory::at(&config.keystore)))
.map_err(|e| ErrorKind::KeyStore(e))?;

let keystore = AccountProvider::new(Box::new(keystore), AccountProviderSettings {
enable_hardware_wallets: false,
hardware_wallet_classic_key: false,
unlock_keep_secret: true,
blacklisted_accounts: vec![],
});
keystore.unlock_account_permanently(config.home.account, config.home.password()?).map_err(|e| ErrorKind::AccountError(e))?;
keystore.unlock_account_permanently(config.foreign.account, config.foreign.password()?).map_err(|e| ErrorKind::AccountError(e))?;
keystore.unlock_account_permanently(config.home.account, config.home.password()?)
.map_err(|e| ErrorKind::AccountError(e))?;
keystore.unlock_account_permanently(config.foreign.account, config.foreign.password()?)
.map_err(|e| ErrorKind::AccountError(e))?;

let max_timeout = config.clone().home.request_timeout.max(config.clone().foreign.request_timeout);

Expand All @@ -96,3 +134,4 @@ impl App<Http> {
Ok(result)
}
}

20 changes: 14 additions & 6 deletions bridge/src/bridge/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,21 @@ impl<T: Transport + Clone> Future for Deploy<T> {
data: test_data.into(),
};

let main_future = api::send_transaction_with_nonce(self.app.connections.home.clone(), self.app.clone(),
self.app.config.home.clone(), main_tx, self.home_chain_id,
TransactionWithConfirmation(self.app.connections.home.clone(), self.app.config.home.poll_interval, self.app.config.home.required_confirmations));
let main_future = api::send_transaction_with_nonce(
self.app.connections.home.clone(), self.app.connections.home_url.clone(),
self.app.clone(), self.app.config.home.clone(), main_tx, self.home_chain_id,
TransactionWithConfirmation(self.app.connections.home.clone(),
self.app.config.home.poll_interval,
self.app.config.home.required_confirmations)
);

let test_future = api::send_transaction_with_nonce(self.app.connections.foreign.clone(), self.app.clone(),
self.app.config.foreign.clone(), test_tx, self.foreign_chain_id,
TransactionWithConfirmation(self.app.connections.foreign.clone(), self.app.config.foreign.poll_interval, self.app.config.foreign.required_confirmations));
let test_future = api::send_transaction_with_nonce(
self.app.connections.foreign.clone(), self.app.connections.foreign_url.clone(),
self.app.clone(), self.app.config.foreign.clone(), test_tx, self.foreign_chain_id,
TransactionWithConfirmation(self.app.connections.foreign.clone(),
self.app.config.foreign.poll_interval,
self.app.config.foreign.required_confirmations)
);

DeployState::Deploying(main_future.join(test_future))
}
Expand Down
8 changes: 5 additions & 3 deletions bridge/src/bridge/deposit_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<T: Transport> Stream for DepositRelay<T> {
let gas = U256::from(self.app.config.txs.deposit_relay.gas);
let gas_price = U256::from(*self.foreign_gas_price.read().unwrap());
let balance_required = gas * gas_price * U256::from(item.logs.len());

if balance_required > *foreign_balance.as_ref().unwrap() {
return Err(ErrorKind::InsufficientFunds.into())
}
Expand All @@ -110,8 +110,10 @@ impl<T: Transport> Stream for DepositRelay<T> {
nonce: U256::zero(),
action: Action::Call(self.foreign_contract.clone()),
};
api::send_transaction_with_nonce(self.app.connections.foreign.clone(), self.app.clone(), self.app.config.foreign.clone(),
tx, self.foreign_chain_id, SendRawTransaction(self.app.connections.foreign.clone()))
api::send_transaction_with_nonce(self.app.connections.foreign.clone(),
self.app.connections.foreign_url.clone(), self.app.clone(),
self.app.config.foreign.clone(), tx, self.foreign_chain_id,
SendRawTransaction(self.app.connections.foreign.clone()))
}).collect_vec();

info!("relaying {} deposits", len);
Expand Down
24 changes: 13 additions & 11 deletions bridge/src/bridge/gas_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod tests {
use super::*;
use error::{Error, ErrorKind};
use futures::{Async, future::{err, ok, FutureResult}};
use config::{Node, NodeInfo, DEFAULT_CONCURRENCY};
use config::{Node, NodeInfo, DEFAULT_CONCURRENCY, RpcUrl};
use tokio_timer::Timer;
use std::time::Duration;
use std::path::PathBuf;
Expand All @@ -168,8 +168,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -211,8 +211,10 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
// rpc_host: "https://rpc".into(),
// rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -254,8 +256,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -296,8 +298,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down Expand Up @@ -338,8 +340,8 @@ mod tests {
request_timeout: Duration::from_secs(5),
poll_interval: Duration::from_secs(1),
required_confirmations: 0,
rpc_host: "https://rpc".into(),
rpc_port: 443,
primary_rpc: RpcUrl { host: "https://rpc".into(), port: 443 },
failover_rpc: None,
password: PathBuf::from("password"),
info: NodeInfo::default(),
gas_price_oracle_url: Some("https://gas.price".into()),
Expand Down
12 changes: 9 additions & 3 deletions bridge/src/bridge/nonce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use web3::types::{U256, H256, Bytes};
use ethcore_transaction::Transaction;
use api::{self, ApiCall};
use error::{Error, ErrorKind};
use config::Node;
use config::{Node, RpcUrlKind};
use transaction::prepare_raw_transaction;
use app::App;
use std::sync::Arc;
Expand All @@ -32,6 +32,8 @@ enum NonceCheckState<T: Transport, S: TransactionSender> {
pub struct NonceCheck<T: Transport, S: TransactionSender> {
app: Arc<App<T>>,
transport: T,
/// Used for logging:
rpc_url: RpcUrlKind,
state: NonceCheckState<T, S>,
node: Node,
transaction: Transaction,
Expand All @@ -48,11 +50,14 @@ impl<T: Transport, S: TransactionSender> Debug for NonceCheck<T, S> {

}

pub fn send_transaction_with_nonce<T: Transport + Clone, S: TransactionSender>(transport: T, app: Arc<App<T>>, node: Node, transaction: Transaction, chain_id: u64, sender: S) -> NonceCheck<T, S> {
pub fn send_transaction_with_nonce<T, S>(transport: T, rpc_url: RpcUrlKind, app: Arc<App<T>>,
node: Node, transaction: Transaction, chain_id: u64, sender: S) -> NonceCheck<T, S>
where T: Transport + Clone, S: TransactionSender {
NonceCheck {
app,
state: NonceCheckState::Ready,
transport,
rpc_url,
node,
transaction,
chain_id,
Expand Down Expand Up @@ -108,7 +113,8 @@ impl<T: Transport, S: TransactionSender> Future for NonceCheck<T, S> {
NonceCheckState::Reacquire
} else if rpc_err.code == rpc::ErrorCode::ServerError(-32010) && rpc_err.message.ends_with("already imported.") {
let hash = self.transaction.hash(Some(self.chain_id));
info!("{} already imported on {}, skipping", hash, self.node.rpc_host);
// info!("{} already imported on {}, skipping", hash, self.node.rpc_host);
info!("{} already imported on {}, skipping", hash, self.rpc_url);
return Ok(Async::Ready(self.sender.ignore(hash)))
} else {
return Err(ErrorKind::Web3(web3::error::ErrorKind::Rpc(rpc_err).into()).into());
Expand Down
6 changes: 4 additions & 2 deletions bridge/src/bridge/withdraw_confirm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ impl<T: Transport> Stream for WithdrawConfirm<T> {
nonce: U256::zero(),
action: Action::Call(contract),
};
api::send_transaction_with_nonce(self.app.connections.foreign.clone(), self.app.clone(), self.app.config.foreign.clone(),
tx, self.foreign_chain_id, SendRawTransaction(self.app.connections.foreign.clone()))
api::send_transaction_with_nonce(self.app.connections.foreign.clone(),
self.app.connections.foreign_url.clone(), self.app.clone(),
self.app.config.foreign.clone(), tx, self.foreign_chain_id,
SendRawTransaction(self.app.connections.foreign.clone()))
}).collect_vec();

info!("submitting {} signatures", len);
Expand Down
4 changes: 3 additions & 1 deletion bridge/src/bridge/withdraw_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
let contract = self.home_contract.clone();
let home = &self.app.config.home;
let t = &self.app.connections.home;
let t_url = &self.app.connections.home_url;
let chain_id = self.home_chain_id;

loop {
Expand Down Expand Up @@ -232,7 +233,8 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
nonce: U256::zero(),
action: Action::Call(contract),
};
api::send_transaction_with_nonce(t.clone(), app.clone(), home.clone(), tx, chain_id, SendRawTransaction(t.clone()))
api::send_transaction_with_nonce(t.clone(), t_url.clone(), app.clone(),
home.clone(), tx, chain_id, SendRawTransaction(t.clone()))
}).collect_vec();

info!("relaying {} withdraws", len);
Expand Down
Loading

0 comments on commit 9cc6814

Please sign in to comment.