Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
perf(providers): replace wake_by_ref with loop (#1428)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Jun 29, 2022
1 parent 7e85b33 commit 73d3d3f
Showing 1 changed file with 32 additions and 39 deletions.
71 changes: 32 additions & 39 deletions ethers-providers/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#![allow(clippy::return_self_not_must_use)]

use crate::{JsonRpcClient, Middleware, PinBoxFut, Provider, ProviderError};

use ethers_core::types::{Transaction, TxHash, U256};

use futures_core::{stream::Stream, Future};
use futures_util::{stream, stream::FuturesUnordered, FutureExt, StreamExt};
use pin_project::pin_project;
Expand Down Expand Up @@ -37,8 +35,8 @@ enum FilterWatcherState<'a, R> {
}

#[must_use = "filters do nothing unless you stream them"]
#[pin_project]
/// Streams data from an installed filter via `eth_getFilterChanges`
#[pin_project]
pub struct FilterWatcher<'a, P, R> {
/// The filter's installed id on the ethereum node
pub id: U256,
Expand All @@ -47,7 +45,7 @@ pub struct FilterWatcher<'a, P, R> {

// The polling interval
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,

/// statemachine driven by the Stream impl
state: FilterWatcherState<'a, R>,
}

Expand Down Expand Up @@ -79,8 +77,7 @@ where
}
}

// Pattern for flattening the returned Vec of filter changes taken from
// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67
// Advances the filter's state machine
impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
Expand All @@ -89,42 +86,38 @@ where
type Item = R;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut this = self.project();
let id = *this.id;

*this.state = match this.state {
FilterWatcherState::WaitForInterval => {
// Wait the polling period
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));

// create a new instance of the future
cx.waker().wake_by_ref();
let fut = Box::pin(this.provider.get_filter_changes(id));
FilterWatcherState::GetFilterChanges(fut)
}
FilterWatcherState::GetFilterChanges(fut) => {
// NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
cx.waker().wake_by_ref();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
// the next call will immediately go to this branch instead of trying to get
// filter changes again. Once the whole vector is consumed, it will poll again
// for new logs
FilterWatcherState::NextItem(iter) => {
cx.waker().wake_by_ref();
match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
loop {
*this.state = match &mut this.state {
FilterWatcherState::WaitForInterval => {
// Wait the polling period
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
let fut = Box::pin(this.provider.get_filter_changes(id));
FilterWatcherState::GetFilterChanges(fut)
}
}
};

Poll::Pending
FilterWatcherState::GetFilterChanges(fut) => {
// NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> =
futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
// the next call will immediately go to this branch instead of trying to get
// filter changes again. Once the whole vector is consumed, it will poll again
// for new logs
FilterWatcherState::NextItem(iter) => {
if let item @ Some(_) = iter.next() {
return Poll::Ready(item)
}
FilterWatcherState::WaitForInterval
}
};
}
}
}

Expand Down

0 comments on commit 73d3d3f

Please sign in to comment.