Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the transport layer to use GAT #2

Open
wants to merge 2 commits into
base: pink
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 125 additions & 119 deletions src/api/eth.rs

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions src/api/eth_filter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! `Eth` namespace, filters.
use crate::helpers::CallFuture;
use crate::prelude::*;
use crate::{
api::Namespace,
Expand Down Expand Up @@ -111,7 +110,7 @@ impl<T: Transport, I> BaseFilter<T, I> {
Self: Sized,
{
let id = helpers::serialize(&self.id);
Ok(CallFuture::new(self.transport.execute("eth_uninstallFilter", vec![id])).await?)
self.transport.execute("eth_uninstallFilter", vec![id]).await
}

/// Borrows the transport.
Expand All @@ -125,7 +124,7 @@ impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
/// Will return logs that happened after previous poll.
pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
let id = helpers::serialize(&self.id);
Ok(CallFuture::new(self.transport.execute("eth_getFilterChanges", vec![id])).await?)
self.transport.execute("eth_getFilterChanges", vec![id]).await
}

/// Returns the stream of items which automatically polls the server
Expand All @@ -138,7 +137,7 @@ impl<T: Transport> BaseFilter<T, Log> {
/// Returns future with all logs matching given filter
pub async fn logs(&self) -> error::Result<Vec<Log>> {
let id = helpers::serialize(&self.id);
Ok(CallFuture::new(self.transport.execute("eth_getFilterLogs", vec![id])).await?)
self.transport.execute("eth_getFilterLogs", vec![id]).await
}
}

Expand All @@ -147,7 +146,7 @@ async fn create_filter<'a, T: Transport, F: FilterInterface>(
transport: T,
arg: Vec<crate::Value<'a>>,
) -> error::Result<BaseFilter<T, F::Output>> {
let id = CallFuture::new(transport.execute(F::constructor(), arg)).await?;
let id = transport.execute(F::constructor(), arg).await?;
Ok(BaseFilter {
id,
transport,
Expand Down
8 changes: 4 additions & 4 deletions src/api/eth_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
pub async fn unsubscribe(self) -> error::Result<bool> {
let &SubscriptionId(ref id) = &self.id;
let id = helpers::serialize(&id);
Ok(CallFuture::new(self.transport.execute("eth_unsubscribe", vec![id])).await?)
self.transport.execute("eth_unsubscribe", vec![id]).await
}
}

Expand Down Expand Up @@ -107,7 +107,7 @@ impl<T: DuplexTransport> EthSubscribe<T> {
/// Create a new heads subscription
pub async fn subscribe_new_heads(&self) -> error::Result<SubscriptionStream<T, BlockHeader>> {
let subscription = helpers::serialize(&&"newHeads");
let id = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription]).await?;
let id = self.transport.execute("eth_subscribe", vec![subscription]).await?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

Expand All @@ -126,14 +126,14 @@ impl<T: DuplexTransport> EthSubscribe<T> {
/// Create a pending transactions subscription
pub async fn subscribe_new_pending_transactions(&self) -> error::Result<SubscriptionStream<T, H256>> {
let subscription = helpers::serialize(&&"newPendingTransactions");
let id = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription])).await?;
let id = self.transport.execute("eth_subscribe", vec![subscription]).await?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}

/// Create a sync status subscription
pub async fn subscribe_syncing(&self) -> error::Result<SubscriptionStream<T, SyncState>> {
let subscription = helpers::serialize(&&"syncing");
let id = CallFuture::new(self.transport.execute("eth_subscribe", vec![subscription])).await?;
let id = self.transport.execute("eth_subscribe", vec![subscription]).await?;
SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
}
}
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use crate::{
types::{Bytes, TransactionReceipt, TransactionRequest, U64},
Transport,
};
use futures::Future;
use core::time::Duration;
use futures::Future;

/// Common API for all namespaces
pub trait Namespace<T: Transport>: Clone {
Expand Down
14 changes: 7 additions & 7 deletions src/api/net.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! `Net` namespace
use crate::prelude::*;
use crate::{api::Namespace, helpers::CallFuture, types::U256, Transport};
use crate::{api::Namespace, error::Result, types::U256, Transport};

/// `Net` namespace
#[derive(Debug, Clone)]
Expand All @@ -23,18 +23,18 @@ impl<T: Transport> Namespace<T> for Net<T> {

impl<T: Transport> Net<T> {
/// Returns the network id.
pub fn version(&self) -> CallFuture<String, T::Out> {
CallFuture::new(self.transport.execute("net_version", vec![]))
pub async fn version(&self) -> Result<String> {
self.transport.execute("net_version", vec![]).await
}

/// Returns number of peers connected to node.
pub fn peer_count(&self) -> CallFuture<U256, T::Out> {
CallFuture::new(self.transport.execute("net_peerCount", vec![]))
pub async fn peer_count(&self) -> Result<U256> {
self.transport.execute("net_peerCount", vec![]).await
}

/// Whether the node is listening for network connections
pub fn is_listening(&self) -> CallFuture<bool, T::Out> {
CallFuture::new(self.transport.execute("net_listening", vec![]))
pub async fn is_listening(&self) -> Result<bool> {
self.transport.execute("net_listening", vec![]).await
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/api/parity.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::prelude::*;
use crate::{
api::Namespace,
helpers::{self, CallFuture},
error::Result,
helpers,
types::{Bytes, CallRequest, ParityPendingTransactionFilter, Transaction},
Transport,
};
Expand All @@ -27,27 +28,27 @@ impl<T: Transport> Namespace<T> for Parity<T> {

impl<T: Transport> Parity<T> {
/// Sequentially call multiple contract methods in one request without changing the state of the blockchain.
pub fn call(&self, reqs: Vec<CallRequest>) -> CallFuture<Vec<Bytes>, T::Out> {
pub async fn call(&self, reqs: Vec<CallRequest>) -> Result<Vec<Bytes>> {
let reqs = helpers::serialize(&reqs);

CallFuture::new(self.transport.execute("parity_call", vec![reqs]))
self.transport.execute("parity_call", vec![reqs]).await
}

/// Get pending transactions
/// Blocked by https://github.com/openethereum/openethereum/issues/159
pub fn pending_transactions(
pub async fn pending_transactions(
&self,
limit: Option<usize>,
filter: Option<ParityPendingTransactionFilter>,
) -> CallFuture<Vec<Transaction>, T::Out> {
) -> Result<Vec<Transaction>> {
let limit = helpers::serialize(&limit);
let filter = filter.as_ref().map(helpers::serialize);
let params = match (limit, filter) {
(l, Some(f)) => vec![l, f],
(l, None) => vec![l],
};

CallFuture::new(self.transport.execute("parity_pendingTransactions", params))
self.transport.execute("parity_pendingTransactions", params).await
}
}

Expand Down
29 changes: 18 additions & 11 deletions src/api/parity_accounts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
api::Namespace,
helpers::{self, CallFuture},
error::Result,
helpers,
types::{Address, H256},
Transport,
};
Expand All @@ -26,37 +27,43 @@ impl<T: Transport> Namespace<T> for ParityAccounts<T> {

impl<T: Transport> ParityAccounts<T> {
/// Given an address of an account and its password deletes the account from the parity node
pub fn parity_kill_account(&self, address: &Address, pwd: &str) -> CallFuture<bool, T::Out> {
pub async fn parity_kill_account(&self, address: &Address, pwd: &str) -> Result<bool> {
let address = helpers::serialize(&address);
let pwd = helpers::serialize(&pwd);
CallFuture::new(self.transport.execute("parity_killAccount", vec![address, pwd]))
self.transport.execute("parity_killAccount", vec![address, pwd]).await
}
/// Imports an account from a given seed/phrase
/// Retunrs the address of the corresponding seed vinculated account
pub fn parity_new_account_from_phrase(&self, seed: &str, pwd: &str) -> CallFuture<Address, T::Out> {
pub async fn parity_new_account_from_phrase(&self, seed: &str, pwd: &str) -> Result<Address> {
let seed = helpers::serialize(&seed);
let pwd = helpers::serialize(&pwd);
CallFuture::new(self.transport.execute("parity_newAccountFromPhrase", vec![seed, pwd]))
self.transport
.execute("parity_newAccountFromPhrase", vec![seed, pwd])
.await
}
/// Imports an account from a given secret key.
/// Returns the address of the corresponding Sk vinculated account.
pub fn new_account_from_secret(&self, secret: &H256, pwd: &str) -> CallFuture<Address, T::Out> {
pub async fn new_account_from_secret(&self, secret: &H256, pwd: &str) -> Result<Address> {
let secret = helpers::serialize(&secret);
let pwd = helpers::serialize(&pwd);
CallFuture::new(self.transport.execute("parity_newAccountFromSecret", vec![secret, pwd]))
self.transport
.execute("parity_newAccountFromSecret", vec![secret, pwd])
.await
}
/// Imports an account from a JSON encoded Wallet file.
/// Returns the address of the corresponding wallet.
pub fn parity_new_account_from_wallet(&self, wallet: &str, pwd: &str) -> CallFuture<Address, T::Out> {
pub async fn parity_new_account_from_wallet(&self, wallet: &str, pwd: &str) -> Result<Address> {
let wallet = helpers::serialize(&wallet);
let pwd = helpers::serialize(&pwd);
CallFuture::new(self.transport.execute("parity_newAccountFromWallet", vec![wallet, pwd]))
self.transport
.execute("parity_newAccountFromWallet", vec![wallet, pwd])
.await
}
/// Removes the address of the Parity node addressbook.
/// Returns true if the operation suceeded.
pub fn parity_remove_address(&self, address: &Address) -> CallFuture<bool, T::Out> {
pub async fn parity_remove_address(&self, address: &Address) -> Result<bool> {
let address = helpers::serialize(&address);
CallFuture::new(self.transport.execute("parity_removeAddress", vec![address]))
self.transport.execute("parity_removeAddress", vec![address]).await
}
}

Expand Down
89 changes: 49 additions & 40 deletions src/api/parity_set.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::prelude::*;
use crate::{
api::Namespace,
helpers::{self, CallFuture},
error::Result,
helpers,
types::{Address, ParityPeerType, H256},
Transport,
};
Expand All @@ -24,110 +25,118 @@ impl<T: Transport> Namespace<T> for ParitySet<T> {

impl<T: Transport> ParitySet<T> {
/// Set Parity to accept non-reserved peers (default behavior)
pub fn accept_non_reserved_peers(&self) -> CallFuture<bool, T::Out> {
CallFuture::new(self.transport().execute("parity_acceptNonReservedPeers", vec![]))
pub async fn accept_non_reserved_peers(&self) -> Result<bool> {
self.transport().execute("parity_acceptNonReservedPeers", vec![]).await
}

/// Add a reserved peer
pub fn add_reserved_peer(&self, enode: &str) -> CallFuture<bool, T::Out> {
pub async fn add_reserved_peer(&self, enode: &str) -> Result<bool> {
let enode = helpers::serialize(&enode);
CallFuture::new(self.transport().execute("parity_addReservedPeer", vec![enode]))
self.transport().execute("parity_addReservedPeer", vec![enode]).await
}

/// Set Parity to drop all non-reserved peers. To restore default behavior call parity_acceptNonReservedPeers
pub fn drop_non_reserved_peers(&self) -> CallFuture<bool, T::Out> {
CallFuture::new(self.transport().execute("parity_dropNonReservedPeers", vec![]))
pub async fn drop_non_reserved_peers(&self) -> Result<bool> {
self.transport().execute("parity_dropNonReservedPeers", vec![]).await
}

/// Get list of connected/connecting peers.
pub fn parity_net_peers(&self) -> CallFuture<ParityPeerType, T::Out> {
CallFuture::new(self.transport.execute("parity_netPeers", vec![]))
pub async fn parity_net_peers(&self) -> Result<ParityPeerType> {
self.transport.execute("parity_netPeers", vec![]).await
}

/// Attempts to upgrade Parity to the version specified in parity_upgradeReady
pub fn execute_upgrade(&self) -> CallFuture<bool, T::Out> {
CallFuture::new(self.transport().execute("parity_executeUpgrade", vec![]))
pub async fn execute_upgrade(&self) -> Result<bool> {
self.transport().execute("parity_executeUpgrade", vec![]).await
}

/// Creates a hash of a file at a given URL
pub fn hash_content(&self, url: &str) -> CallFuture<H256, T::Out> {
pub async fn hash_content(&self, url: &str) -> Result<H256> {
let url = helpers::serialize(&url);
CallFuture::new(self.transport().execute("parity_hashContent", vec![url]))
self.transport().execute("parity_hashContent", vec![url]).await
}

/// Remove a reserved peer
pub fn remove_reserved_peer(&self, enode: &str) -> CallFuture<bool, T::Out> {
pub async fn remove_reserved_peer(&self, enode: &str) -> Result<bool> {
let enode = helpers::serialize(&enode);
CallFuture::new(self.transport().execute("parity_removeReservedPeer", vec![enode]))
self.transport().execute("parity_removeReservedPeer", vec![enode]).await
}

/// Changes author (coinbase) for mined blocks
pub fn set_author(&self, author: &Address) -> CallFuture<bool, T::Out> {
pub async fn set_author(&self, author: &Address) -> Result<bool> {
let address = helpers::serialize(&author);
CallFuture::new(self.transport().execute("parity_setAuthor", vec![address]))
self.transport().execute("parity_setAuthor", vec![address]).await
}

/// Sets the network spec file Parity is using
pub fn set_chain(&self, chain: &str) -> CallFuture<bool, T::Out> {
pub async fn set_chain(&self, chain: &str) -> Result<bool> {
let chain = helpers::serialize(&chain);
CallFuture::new(self.transport().execute("parity_setChain", vec![chain]))
self.transport().execute("parity_setChain", vec![chain]).await
}

/// Sets an authority account for signing consensus messages
pub fn set_engine_signer(&self, address: &Address, password: &str) -> CallFuture<bool, T::Out> {
pub async fn set_engine_signer(&self, address: &Address, password: &str) -> Result<bool> {
let address = helpers::serialize(&address);
let password = helpers::serialize(&password);
CallFuture::new(
self.transport()
.execute("parity_setEngineSigner", vec![address, password]),
)

self.transport()
.execute("parity_setEngineSigner", vec![address, password])
.await
}

/// Changes extra data for newly mined blocks
pub fn set_extra_data(&self, data: &H256) -> CallFuture<bool, T::Out> {
pub async fn set_extra_data(&self, data: &H256) -> Result<bool> {
let data = helpers::serialize(&data);
CallFuture::new(self.transport().execute("parity_setExtraData", vec![data]))
self.transport().execute("parity_setExtraData", vec![data]).await
}

/// Sets new gas ceiling target for mined blocks
pub fn set_gas_ceil_target(&self, quantity: &H256) -> CallFuture<bool, T::Out> {
pub async fn set_gas_ceil_target(&self, quantity: &H256) -> Result<bool> {
let quantity = helpers::serialize(&quantity);
CallFuture::new(self.transport().execute("parity_setGasCeilTarget", vec![quantity]))
self.transport()
.execute("parity_setGasCeilTarget", vec![quantity])
.await
}

/// Sets a new gas floor target for mined blocks
pub fn set_gas_floor_target(&self, quantity: &H256) -> CallFuture<bool, T::Out> {
pub async fn set_gas_floor_target(&self, quantity: &H256) -> Result<bool> {
let quantity = helpers::serialize(&quantity);
CallFuture::new(self.transport().execute("parity_setGasFloorTarget", vec![quantity]))
self.transport()
.execute("parity_setGasFloorTarget", vec![quantity])
.await
}

/// Sets the maximum amount of gas a single transaction may consume
pub fn set_max_transaction_gas(&self, quantity: &H256) -> CallFuture<bool, T::Out> {
pub async fn set_max_transaction_gas(&self, quantity: &H256) -> Result<bool> {
let quantity = helpers::serialize(&quantity);
CallFuture::new(self.transport().execute("parity_setMaxTransactionGas", vec![quantity]))
self.transport()
.execute("parity_setMaxTransactionGas", vec![quantity])
.await
}

/// Changes minimal gas price for transaction to be accepted to the queue
pub fn set_min_gas_price(&self, quantity: &H256) -> CallFuture<bool, T::Out> {
pub async fn set_min_gas_price(&self, quantity: &H256) -> Result<bool> {
let quantity = helpers::serialize(&quantity);
CallFuture::new(self.transport().execute("parity_setMinGasPrice", vec![quantity]))
self.transport().execute("parity_setMinGasPrice", vec![quantity]).await
}

/// Changes the operating mode of Parity.
pub fn set_mode(&self, mode: &str) -> CallFuture<bool, T::Out> {
pub async fn set_mode(&self, mode: &str) -> Result<bool> {
let mode = helpers::serialize(&mode);
CallFuture::new(self.transport().execute("parity_setMode", vec![mode]))
self.transport().execute("parity_setMode", vec![mode]).await
}

/// Changes limit for transactions in queue. (NOT WORKING !)
pub fn set_transactions_limit(&self, limit: &H256) -> CallFuture<bool, T::Out> {
pub async fn set_transactions_limit(&self, limit: &H256) -> Result<bool> {
let limit = helpers::serialize(&limit);
CallFuture::new(self.transport().execute("parity_setTransactionsLimit", vec![limit]))
self.transport()
.execute("parity_setTransactionsLimit", vec![limit])
.await
}

/// Returns a ReleaseInfo object describing the release which is available for upgrade or null if none is available.
pub fn upgrade_ready(&self) -> CallFuture<Option<String>, T::Out> {
CallFuture::new(self.transport().execute("parity_upgradeReady", vec![]))
pub async fn upgrade_ready(&self) -> Result<Option<String>> {
self.transport().execute("parity_upgradeReady", vec![]).await
}
}

Expand Down
Loading