Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 87 additions & 69 deletions crates/rpc/rpc/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ where
) -> Result<Vec<LocalizedTransactionTrace>, Eth::Error> {
// We'll reuse the matcher across multiple blocks that are traced in parallel
let matcher = Arc::new(filter.matcher());
let TraceFilter { from_block, to_block, after, count, .. } = filter;
let TraceFilter { from_block, to_block, mut after, count, .. } = filter;
let start = from_block.unwrap_or(0);

let latest_block = self.provider().best_block_number().map_err(Eth::Error::from_eth_err)?;
Expand All @@ -389,80 +389,97 @@ where
.into())
}

// fetch all blocks in that range
let blocks = self
.provider()
.recovered_block_range(start..=end)
.map_err(Eth::Error::from_eth_err)?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();

// trace all blocks
let mut block_traces = Vec::with_capacity(blocks.len());
for block in &blocks {
let matcher = matcher.clone();
let traces = self.eth_api().trace_block_until(
block.hash().into(),
Some(block.clone()),
None,
TracingInspectorConfig::default_parity(),
move |tx_info, mut ctx| {
let mut traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
traces.retain(|trace| matcher.matches(&trace.trace));
Ok(Some(traces))
},
);
block_traces.push(traces);
}

let block_traces = futures::future::try_join_all(block_traces).await?;
let mut all_traces = block_traces
.into_iter()
.flatten()
.flat_map(|traces| traces.into_iter().flatten().flat_map(|traces| traces.into_iter()))
.collect::<Vec<_>>();

// add reward traces for all blocks
for block in &blocks {
if let Some(base_block_reward) = self.calculate_base_block_reward(block.header())? {
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
)
.into_iter()
.filter(|trace| matcher.matches(&trace.trace)),
let mut all_traces = Vec::new();
let mut block_traces = Vec::with_capacity(self.inner.eth_config.max_tracing_requests);
for chunk_start in (start..end).step_by(self.inner.eth_config.max_tracing_requests) {
let chunk_end =
std::cmp::min(chunk_start + self.inner.eth_config.max_tracing_requests as u64, end);

// fetch all blocks in that chunk
let blocks = self
.eth_api()
.spawn_blocking_io(move |this| {
Ok(this
.provider()
.recovered_block_range(chunk_start..=chunk_end)
.map_err(Eth::Error::from_eth_err)?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>())
})
.await?;

// trace all blocks
for block in &blocks {
let matcher = matcher.clone();
let traces = self.eth_api().trace_block_until(
block.hash().into(),
Some(block.clone()),
None,
TracingInspectorConfig::default_parity(),
move |tx_info, mut ctx| {
let mut traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
traces.retain(|trace| matcher.matches(&trace.trace));
Ok(Some(traces))
},
);
} else {
// no block reward, means we're past the Paris hardfork and don't expect any rewards
// because the blocks in ascending order
break
block_traces.push(traces);
}
}

// Skips the first `after` number of matching traces.
// If `after` is greater than or equal to the number of matched traces, it returns an empty
// array.
if let Some(after) = after.map(|a| a as usize) {
if after < all_traces.len() {
all_traces.drain(..after);
} else {
return Ok(vec![])
#[allow(clippy::iter_with_drain)]
let block_traces = futures::future::try_join_all(block_traces.drain(..)).await?;
all_traces.extend(block_traces.into_iter().flatten().flat_map(|traces| {
traces.into_iter().flatten().flat_map(|traces| traces.into_iter())
}));

// add reward traces for all blocks
for block in &blocks {
if let Some(base_block_reward) = self.calculate_base_block_reward(block.header())? {
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
)
.into_iter()
.filter(|trace| matcher.matches(&trace.trace)),
);
} else {
// no block reward, means we're past the Paris hardfork and don't expect any
// rewards because the blocks in ascending order
break
}
}
}

// Return at most `count` of traces
if let Some(count) = count {
let count = count as usize;
if count < all_traces.len() {
all_traces.truncate(count);
// Skips the first `after` number of matching traces.
if let Some(cutoff) = after.map(|a| a as usize) &&
cutoff < all_traces.len()
{
all_traces.drain(..cutoff);
// we removed the first `after` traces
after = None;
}
};

// Return at most `count` of traces
if let Some(count) = count {
let count = count as usize;
if count < all_traces.len() {
all_traces.truncate(count);
return Ok(all_traces)
}
};
}

// If `after` is greater than or equal to the number of matched traces, it returns an
// empty array.
if let Some(cutoff) = after.map(|a| a as usize) &&
cutoff >= all_traces.len()
{
return Ok(vec![])
}

Ok(all_traces)
}
Expand Down Expand Up @@ -692,6 +709,7 @@ where
/// # Limitations
/// This currently requires block filter fields, since reth does not have address indices yet.
async fn trace_filter(&self, filter: TraceFilter) -> RpcResult<Vec<LocalizedTransactionTrace>> {
let _permit = self.inner.blocking_task_guard.clone().acquire_many_owned(2).await;
Ok(Self::trace_filter(self, filter).await.map_err(Into::into)?)
}

Expand Down