Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 80 additions & 56 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1813,76 +1813,100 @@ impl BlockChainClient for Client {
self.engine.additional_params().into_iter().collect()
}

fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry> {
// Wrap the logic inside a closure so that we can take advantage of question mark syntax.
let fetch_logs = || {
let chain = self.chain.read();
fn logs(&self, filter: Filter) -> Result<Vec<LocalizedLogEntry>, BlockId> {
let chain = self.chain.read();

// First, check whether `filter.from_block` and `filter.to_block` is on the canon chain. If so, we can use the
// optimized version.
let is_canon = |id| {
match id {
// If it is referred by number, then it is always on the canon chain.
&BlockId::Earliest | &BlockId::Latest | &BlockId::Number(_) => true,
// If it is referred by hash, we see whether a hash -> number -> hash conversion gives us the same
// result.
&BlockId::Hash(ref hash) => chain.is_canon(hash),
}
};
// First, check whether `filter.from_block` and `filter.to_block` is on the canon chain. If so, we can use the
// optimized version.
let is_canon = |id| {
match id {
// If it is referred by number, then it is always on the canon chain.
&BlockId::Earliest | &BlockId::Latest | &BlockId::Number(_) => true,
// If it is referred by hash, we see whether a hash -> number -> hash conversion gives us the same
// result.
&BlockId::Hash(ref hash) => chain.is_canon(hash),
}
};

let blocks = if is_canon(&filter.from_block) && is_canon(&filter.to_block) {
// If we are on the canon chain, use bloom filter to fetch required hashes.
let from = self.block_number_ref(&filter.from_block)?;
let to = self.block_number_ref(&filter.to_block)?;
let blocks = if is_canon(&filter.from_block) && is_canon(&filter.to_block) {
// If we are on the canon chain, use bloom filter to fetch required hashes.
//
// If we are sure the block does not exist (where val > best_block_number), then return error. Note that we
// don't need to care about pending blocks here because RPC query sets pending back to latest (or handled
// pending logs themselves).
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our current use cases are mostly fine. We have cases where we set fromBlock to a nonexistent future block (in filter_changes), but we actually just expect empty result in that case. I didn't find any case where we set fromBlock to a current block, but toBlock to a nonexistent block.

But please do note that this is a breaking change.

Copy link
Copy Markdown
Contributor

@andresilva andresilva Aug 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to validate that from <= to as well? I don't think we validate that on the RPC layer.

let from = match self.block_number_ref(&filter.from_block) {
Some(val) if val <= chain.best_block_number() => val,
_ => return Err(filter.from_block.clone()),
};
let to = match self.block_number_ref(&filter.to_block) {
Some(val) if val <= chain.best_block_number() => val,
_ => return Err(filter.to_block.clone()),
};

chain.blocks_with_bloom(&filter.bloom_possibilities(), from, to)
.into_iter()
.filter_map(|n| chain.block_hash(n))
.collect::<Vec<H256>>()
} else {
// Otherwise, we use a slower version that finds a link between from_block and to_block.
let from_hash = Self::block_hash(&chain, filter.from_block)?;
let from_number = chain.block_number(&from_hash)?;
let to_hash = Self::block_hash(&chain, filter.to_block)?;

let blooms = filter.bloom_possibilities();
let bloom_match = |header: &encoded::Header| {
blooms.iter().any(|bloom| header.log_bloom().contains_bloom(bloom))
};
// If from is greater than to, then the current bloom filter behavior is to just return empty
// result. There's no point to continue here.
if from > to {
return Err(filter.to_block.clone());
}

let (blocks, last_hash) = {
let mut blocks = Vec::new();
let mut current_hash = to_hash;
chain.blocks_with_bloom(&filter.bloom_possibilities(), from, to)
.into_iter()
.filter_map(|n| chain.block_hash(n))
.collect::<Vec<H256>>()
} else {
// Otherwise, we use a slower version that finds a link between from_block and to_block.
let from_hash = match Self::block_hash(&chain, filter.from_block) {
Some(val) => val,
None => return Err(filter.from_block.clone()),
};
let from_number = match chain.block_number(&from_hash) {
Some(val) => val,
None => return Err(BlockId::Hash(from_hash)),
};
let to_hash = match Self::block_hash(&chain, filter.to_block) {
Some(val) => val,
None => return Err(filter.to_block.clone()),
};

loop {
let header = chain.block_header_data(&current_hash)?;
if bloom_match(&header) {
blocks.push(current_hash);
}
let blooms = filter.bloom_possibilities();
let bloom_match = |header: &encoded::Header| {
blooms.iter().any(|bloom| header.log_bloom().contains_bloom(bloom))
};

// Stop if `from` block is reached.
if header.number() <= from_number {
break;
}
current_hash = header.parent_hash();
let (blocks, last_hash) = {
let mut blocks = Vec::new();
let mut current_hash = to_hash;

loop {
let header = match chain.block_header_data(&current_hash) {
Some(val) => val,
None => return Err(BlockId::Hash(current_hash)),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From RPC-side you'll input block numbers for from/to but in this case you would get back an error that is a block hash. I don't think it's very problematic since this error should be rare, since we do range validation before starting the loop. We could coerce the blockid error depending on the type used in filter, i.e. if it uses block numbers return error using block number, otherwise use hash. But maybe this is added complexity for a case that is rare (those blocks should be on your local db and it shouldn't fail).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code only deals with the case where if either from_block or to_block is a block hash. If both of them are numbers (which is the case for RPC's fromBlock and toBlock, then we won't go into this else branch (https://github.com/paritytech/parity-ethereum/pull/9256/files/4b45f9ac913f7ef9d7e3d3cda22a2d0c28dc23ea#diff-40c7c65265e511d07158f62712f69573R1850). If user inputs block numbers, he/she can only get back block numbers as errors. :)

};
if bloom_match(&header) {
blocks.push(current_hash);
}

blocks.reverse();
(blocks, current_hash)
};

// Check if we've actually reached the expected `from` block.
if last_hash != from_hash || blocks.is_empty() {
return None;
// Stop if `from` block is reached.
if header.number() <= from_number {
break;
}
current_hash = header.parent_hash();
}

blocks
blocks.reverse();
(blocks, current_hash)
};

Some(self.chain.read().logs(blocks, |entry| filter.matches(entry), filter.limit))
// Check if we've actually reached the expected `from` block.
if last_hash != from_hash || blocks.is_empty() {
// In this case, from_hash is the cause (for not matching last_hash).
return Err(BlockId::Hash(from_hash));
}

blocks
};

fetch_logs().unwrap_or_default()
Ok(self.chain.read().logs(blocks, |entry| filter.matches(entry), filter.limit))
}

fn filter_traces(&self, filter: TraceFilter) -> Option<Vec<LocalizedTrace>> {
Expand Down
19 changes: 16 additions & 3 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub struct TestBlockChainClient {
pub receipts: RwLock<HashMap<TransactionId, LocalizedReceipt>>,
/// Logs
pub logs: RwLock<Vec<LocalizedLogEntry>>,
/// Should return errors on logs.
pub error_on_logs: RwLock<Option<BlockId>>,
/// Block queue size.
pub queue_size: AtomicUsize,
/// Miner
Expand Down Expand Up @@ -178,6 +180,7 @@ impl TestBlockChainClient {
traces: RwLock::new(None),
history: RwLock::new(None),
disabled: AtomicBool::new(false),
error_on_logs: RwLock::new(None),
};

// insert genesis hash.
Expand Down Expand Up @@ -233,6 +236,11 @@ impl TestBlockChainClient {
*self.logs.write() = logs;
}

/// Set return errors on logs.
pub fn set_error_on_logs(&self, val: Option<BlockId>) {
*self.error_on_logs.write() = val;
}

/// Add blocks to test client.
pub fn add_blocks(&self, count: usize, with: EachBlockWith) {
let len = self.numbers.read().len();
Expand Down Expand Up @@ -665,13 +673,18 @@ impl BlockChainClient for TestBlockChainClient {
self.receipts.read().get(&id).cloned()
}

fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry> {
fn logs(&self, filter: Filter) -> Result<Vec<LocalizedLogEntry>, BlockId> {
match self.error_on_logs.read().as_ref() {
Some(id) => return Err(id.clone()),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to return both the block ID and the block hash?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This BlockId contains BlockId::Number and BlockId::Hash, so it may be fine here -- the tester can choose which type of BlockId it wants to test. :)

None => (),
}

let mut logs = self.logs.read().clone();
let len = logs.len();
match filter.limit {
Ok(match filter.limit {
Some(limit) if limit <= len => logs.split_off(len - limit),
_ => logs,
}
})
}

fn last_hashes(&self) -> LastHashes {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get the registrar address, if it exists.
fn additional_params(&self) -> BTreeMap<String, String>;

/// Returns logs matching given filter.
fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry>;
/// Returns logs matching given filter. If one of the filtering block cannot be found, returns the block id that caused the error.
fn logs(&self, filter: Filter) -> Result<Vec<LocalizedLogEntry>, BlockId>;

/// Replays a given transaction for inspection.
fn replay(&self, t: TransactionId, analytics: CallAnalytics) -> Result<Executed, CallError>;
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn returns_logs() {
address: None,
topics: vec![],
limit: None,
});
}).unwrap();
assert_eq!(logs.len(), 0);
}

Expand All @@ -164,7 +164,7 @@ fn returns_logs_with_limit() {
address: None,
topics: vec![],
limit: None,
});
}).unwrap();
assert_eq!(logs.len(), 0);
}

Expand Down
14 changes: 14 additions & 0 deletions rpc/src/v1/helpers/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fmt;

use ethcore::account_provider::{SignError as AccountError};
use ethcore::error::{Error as EthcoreError, ErrorKind, CallError};
use ethcore::client::BlockId;
use jsonrpc_core::{futures, Error, ErrorCode, Value};
use rlp::DecoderError;
use transaction::Error as TransactionError;
Expand Down Expand Up @@ -422,6 +423,19 @@ pub fn filter_not_found() -> Error {
}
}

pub fn filter_block_not_found(id: BlockId) -> Error {
Error {
code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), // Specified in EIP-234.
message: "One of the blocks specified in filter (fromBlock, toBlock or blockHash) cannot be found".into(),
data: Some(Value::String(match id {
BlockId::Hash(hash) => format!("0x{:x}", hash),
BlockId::Number(number) => format!("0x{:x}", number),
BlockId::Earliest => "earliest".to_string(),
BlockId::Latest => "latest".to_string(),
})),
}
}

// on-demand sender cancelled.
pub fn on_demand_cancel(_cancel: futures::sync::oneshot::Canceled) -> Error {
internal("on-demand sender cancelled", "")
Expand Down
11 changes: 9 additions & 2 deletions rpc/src/v1/helpers/poll_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use std::{
};
use ethereum_types::H256;
use parking_lot::Mutex;
use v1::types::{Filter, Log};
use ethcore::filter::Filter;
use v1::types::Log;

pub type BlockNumber = u64;

Expand Down Expand Up @@ -52,7 +53,13 @@ pub enum PollFilter {
/// Hashes of all pending transactions the client knows about.
PendingTransaction(BTreeSet<H256>),
/// Number of From block number, last seen block hash, pending logs and log filter itself.
Logs(BlockNumber, Option<H256>, HashSet<Log>, Filter)
Logs {
block_number: BlockNumber,
last_block_hash: Option<H256>,
previous_logs: HashSet<Log>,
filter: Filter,
include_pending: bool,
}
}

/// Returns only last `n` logs
Expand Down
16 changes: 11 additions & 5 deletions rpc/src/v1/impls/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,17 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<

fn logs(&self, filter: Filter) -> BoxFuture<Vec<Log>> {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = self.client.logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();
let filter: EthcoreFilter = match filter.try_into() {
Ok(value) => value,
Err(err) => return Box::new(future::err(err)),
};
let mut logs = match self.client.logs(filter.clone()) {
Ok(logs) => logs
.into_iter()
.map(From::from)
.collect::<Vec<Log>>(),
Err(id) => return Box::new(future::err(errors::filter_block_not_found(id))),
};

if include_pending {
let best_block = self.client.chain_info().best_block_number;
Expand Down
36 changes: 21 additions & 15 deletions rpc/src/v1/impls/eth_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
}

fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>> {
Box::new(future::ok(self.client.logs(filter).into_iter().map(Into::into).collect()))
Box::new(future::ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).collect()))
}

fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec<Log> {
Expand Down Expand Up @@ -125,7 +125,7 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
filter.from_block = BlockId::Hash(block_hash);
filter.to_block = filter.from_block;

self.client.logs(filter).into_iter().map(|log| {
self.client.logs(filter).unwrap_or_default().into_iter().map(|log| {
let mut log: Log = log.into();
log.log_type = "removed".into();
log.removed = true;
Expand All @@ -140,7 +140,13 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
fn new_filter(&self, filter: Filter) -> Result<RpcU256> {
let mut polls = self.polls().lock();
let block_number = self.best_block_number();
let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs(block_number, None, Default::default(), filter)));
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter = filter.try_into()?;
let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs {
block_number, filter, include_pending,
last_block_hash: None,
previous_logs: Default::default()
}));
Ok(id.into())
}

Expand Down Expand Up @@ -195,15 +201,17 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
// return new hashes
Either::A(future::ok(FilterChanges::Hashes(new_hashes)))
},
PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => {
PollFilter::Logs {
ref mut block_number,
ref mut last_block_hash,
ref mut previous_logs,
ref filter,
include_pending,
} => {
// retrive the current block number
let current_number = self.best_block_number();

// check if we need to check pending hashes
let include_pending = filter.to_block == Some(BlockNumber::Pending);

// build appropriate filter
let mut filter: EthcoreFilter = filter.clone().into();
let mut filter = filter.clone();

// retrieve reorg logs
let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter));
Expand Down Expand Up @@ -250,21 +258,19 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
}

fn filter_logs(&self, index: Index) -> BoxFuture<Vec<Log>> {
let filter = {
let (filter, include_pending) = {
let mut polls = self.polls().lock();

match polls.poll(&index.value()).and_then(|f| f.modify(|filter| match *filter {
PollFilter::Logs(.., ref filter) => Some(filter.clone()),
PollFilter::Logs { ref filter, include_pending, .. } =>
Some((filter.clone(), include_pending)),
_ => None,
})) {
Some(filter) => filter,
Some((filter, include_pending)) => (filter, include_pending),
None => return Box::new(future::err(errors::filter_not_found())),
}
};

let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();

// fetch pending logs.
let pending = if include_pending {
let best_block = self.best_block_number();
Expand Down
Loading