Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feat(contract): allow events to be streamed
Browse files Browse the repository at this point in the history
  • Loading branch information
gakonst committed Jun 12, 2020
1 parent 0e004ce commit 926fe3b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
1 change: 1 addition & 0 deletions ethers-contract/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ rustc-hex = { version = "2.1.0", default-features = false }
thiserror = { version = "1.0.19", default-features = false }
once_cell = { version = "1.4.0", default-features = false }
tokio = { version = "0.2.21", default-features = false }
futures = "0.3.5"

[dev-dependencies]
tokio = { version = "0.2.21", default-features = false, features = ["macros"] }
Expand Down
20 changes: 17 additions & 3 deletions ethers-contract/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::ContractError;

use ethers_providers::{JsonRpcClient, Provider};
use ethers_providers::{FilterStream, JsonRpcClient, Provider};

use ethers_core::{
abi::{Detokenize, Event as AbiEvent, RawLog},
types::{BlockNumber, Filter, Log, ValueOrArray, H256},
};

use futures::stream::{Stream, StreamExt};
use std::{collections::HashMap, marker::PhantomData};

/// Helper for managing the event filter before querying or streaming its logs
Expand Down Expand Up @@ -60,6 +61,21 @@ impl<P, D: Detokenize> Event<'_, '_, P, D> {
}
}

impl<'a, 'b, P, D> Event<'a, 'b, P, D>
where
P: JsonRpcClient,
D: 'b + Detokenize + Clone,
'a: 'b,
{
/// Returns a stream for the event
pub async fn stream(
self,
) -> Result<impl Stream<Item = Result<D, ContractError>> + 'b, ContractError> {
let filter = self.provider.watch(&self.filter).await?;
Ok(filter.stream().map(move |log| self.parse_log(log)))
}
}

impl<P, D> Event<'_, '_, P, D>
where
P: JsonRpcClient,
Expand Down Expand Up @@ -107,6 +123,4 @@ where
// convert the tokens to the requested datatype
Ok(D::from_tokens(tokens)?)
}

// TODO: Add filter watchers
}
8 changes: 4 additions & 4 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ pub enum ProviderError {

/// Types of filters supported by the JSON-RPC.
#[derive(Clone, Debug)]
pub enum FilterKind {
pub enum FilterKind<'a> {
/// `eth_newBlockFilter`
Logs(Filter),
Logs(&'a Filter),

/// `eth_newBlockFilter` filter
NewBlocks,
Expand Down Expand Up @@ -326,7 +326,7 @@ impl<P: JsonRpcClient> Provider<P> {
/// Streams matching filter logs
pub async fn watch(
&self,
filter: Filter,
filter: &Filter,
) -> Result<impl FilterStream<Log> + '_, ProviderError> {
let id = self.new_filter(FilterKind::Logs(filter)).await?;
let fut = move || Box::pin(self.get_filter_changes(id));
Expand All @@ -351,7 +351,7 @@ impl<P: JsonRpcClient> Provider<P> {

/// Creates a filter object, based on filter options, to notify when the state changes (logs).
/// To check if the state has changed, call `get_filter_changes` with the filter id.
pub async fn new_filter(&self, filter: FilterKind) -> Result<U256, ProviderError> {
pub async fn new_filter<'a>(&self, filter: FilterKind<'a>) -> Result<U256, ProviderError> {
let (method, args) = match filter {
FilterKind::NewBlocks => ("eth_newBlockFilter", utils::serialize(&())),
FilterKind::PendingTransactions => {
Expand Down
3 changes: 2 additions & 1 deletion ethers-providers/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::ProviderError;
use ethers_core::types::U256;

use futures_core::{stream::Stream, TryFuture};
use futures_util::StreamExt;
use pin_project::pin_project;
use serde::Deserialize;
use std::{
Expand All @@ -17,7 +18,7 @@ use tokio::time::{interval, Interval};
const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000);

/// Trait for streaming filters. You can get the id.
pub trait FilterStream<R>: Stream<Item = R>
pub trait FilterStream<R>: StreamExt + Stream<Item = R>
where
R: for<'de> Deserialize<'de>,
{
Expand Down

0 comments on commit 926fe3b

Please sign in to comment.