Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feat: stream_with_meta (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
gakonst authored Jul 30, 2021
1 parent c8d9f9b commit 9fc142c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 40 deletions.
39 changes: 2 additions & 37 deletions ethers-contract/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{stream::EventStream, ContractError, EthLogDecode};
use crate::{log::LogMeta, stream::EventStream, ContractError, EthLogDecode};

use ethers_core::{
abi::{Detokenize, RawLog},
types::{Address, BlockNumber, Filter, Log, TxHash, ValueOrArray, H256, U256, U64},
types::{BlockNumber, Filter, Log, ValueOrArray, H256},
};
use ethers_providers::{FilterWatcher, Middleware, PubsubClient, SubscriptionStream};
use std::borrow::Cow;
Expand Down Expand Up @@ -214,38 +214,3 @@ where
.map_err(From::from)
}
}

/// Metadata inside a log
#[derive(Clone, Debug, PartialEq)]
pub struct LogMeta {
/// Address from which this log originated
pub address: Address,

/// The block in which the log was emitted
pub block_number: U64,

/// The block hash in which the log was emitted
pub block_hash: H256,

/// The transaction hash in which the log was emitted
pub transaction_hash: TxHash,

/// Transactions index position log was created from
pub transaction_index: U64,

/// Log index position in the block
pub log_index: U256,
}

impl From<&Log> for LogMeta {
fn from(src: &Log) -> Self {
LogMeta {
address: src.address,
block_number: src.block_number.expect("should have a block number"),
block_hash: src.block_hash.expect("should have a block hash"),
transaction_hash: src.transaction_hash.expect("should have a tx hash"),
transaction_index: src.transaction_index.expect("should have a tx index"),
log_index: src.log_index.expect("should have a log index"),
}
}
}
4 changes: 2 additions & 2 deletions ethers-contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ mod factory;
pub use factory::ContractFactory;

mod event;
pub use event::{EthEvent, LogMeta};
pub use event::EthEvent;

mod log;
pub use log::{decode_logs, EthLogDecode};
pub use log::{decode_logs, EthLogDecode, LogMeta};

mod stream;

Expand Down
36 changes: 36 additions & 0 deletions ethers-contract/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Mod of types for ethereum logs
use ethers_core::abi::Error;
use ethers_core::abi::RawLog;
use ethers_core::types::{Address, Log, TxHash, H256, U256, U64};

/// A trait for types (events) that can be decoded from a `RawLog`
pub trait EthLogDecode: Send + Sync {
Expand All @@ -14,3 +15,38 @@ pub trait EthLogDecode: Send + Sync {
pub fn decode_logs<T: EthLogDecode>(logs: &[RawLog]) -> Result<Vec<T>, Error> {
logs.iter().map(T::decode_log).collect()
}

/// Metadata inside a log
#[derive(Clone, Debug, PartialEq)]
pub struct LogMeta {
/// Address from which this log originated
pub address: Address,

/// The block in which the log was emitted
pub block_number: U64,

/// The block hash in which the log was emitted
pub block_hash: H256,

/// The transaction hash in which the log was emitted
pub transaction_hash: TxHash,

/// Transactions index position log was created from
pub transaction_index: U64,

/// Log index position in the block
pub log_index: U256,
}

impl From<&Log> for LogMeta {
fn from(src: &Log) -> Self {
LogMeta {
address: src.address,
block_number: src.block_number.expect("should have a block number"),
block_hash: src.block_hash.expect("should have a block hash"),
transaction_hash: src.transaction_hash.expect("should have a tx hash"),
transaction_index: src.transaction_index.expect("should have a tx index"),
log_index: src.log_index.expect("should have a log index"),
}
}
}
30 changes: 30 additions & 0 deletions ethers-contract/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::LogMeta;
use ethers_core::types::{Log, U256};
use futures_util::stream::{Stream, StreamExt};
use pin_project::pin_project;
Expand All @@ -19,6 +20,12 @@ pub struct EventStream<'a, T, R, E> {
parse: MapEvent<'a, R, E>,
}

impl<'a, T, R, E> EventStream<'a, T, R, E> {
pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> {
EventStreamMeta(self)
}
}

impl<'a, T, R, E> EventStream<'a, T, R, E> {
pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self {
Self { id, stream, parse }
Expand All @@ -39,3 +46,26 @@ where
}
}
}

#[pin_project]
pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>);

impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin,
{
type Item = Result<(R, LogMeta), E>;

fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match futures_util::ready!(this.0.stream.poll_next_unpin(ctx)) {
Some(item) => {
let meta = LogMeta::from(&item);
let res = (this.0.parse)(item);
let res = res.map(|inner| (inner, meta));
Poll::Ready(Some(res))
}
None => Poll::Pending,
}
}
}
17 changes: 16 additions & 1 deletion ethers-contract/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ mod eth_tests {
let (abi, bytecode) = compile_contract("SimpleStorage", "SimpleStorage.sol");
let ganache = Ganache::new().spawn();
let client = connect(&ganache, 0);
let contract = deploy(client, abi.clone(), bytecode).await;
let contract = deploy(client.clone(), abi.clone(), bytecode).await;

// We spawn the event listener:
let event = contract.event::<ValueChanged>();
Expand All @@ -292,9 +292,13 @@ mod eth_tests {
let mut subscription = event2.subscribe().await.unwrap();
assert_eq!(subscription.id, 2.into());

let mut subscription_meta = event2.subscribe().await.unwrap().with_meta();
assert_eq!(subscription_meta.0.id, 3.into());

let num_calls = 3u64;

// and we make a few calls
let num = client.get_block_number().await.unwrap();
for i in 0..num_calls {
let call = contract
.method::<_, H256>("setValue", i.to_string())
Expand All @@ -307,8 +311,19 @@ mod eth_tests {
// unwrap the option of the stream, then unwrap the decoding result
let log = stream.next().await.unwrap().unwrap();
let log2 = subscription.next().await.unwrap().unwrap();
let (log3, meta) = subscription_meta.next().await.unwrap().unwrap();
assert_eq!(log.new_value, log3.new_value);
assert_eq!(log.new_value, log2.new_value);
assert_eq!(log.new_value, i.to_string());
assert_eq!(meta.block_number, num + i + 1);
let hash = client
.get_block(num + i + 1)
.await
.unwrap()
.unwrap()
.hash
.unwrap();
assert_eq!(meta.block_hash, hash);
}
}

Expand Down

0 comments on commit 9fc142c

Please sign in to comment.