Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ethcore/engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ pub trait Engine: Sync + Send {
self.machine().verify_transaction_basic(t, header)
}

/// Performs pre-validation of RLP decoded transaction before other processing
/// Performs pre-validation of RLP encoded transaction before other
/// processing: check length against `max_transaction_size` and decode the
/// RLP.
fn decode_transaction(&self, transaction: &[u8]) -> Result<UnverifiedTransaction, transaction::Error> {
self.machine().decode_transaction(transaction)
}
Expand Down
4 changes: 3 additions & 1 deletion ethcore/machine/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ impl Machine {
Ok(())
}

/// Performs pre-validation of RLP decoded transaction before other processing
/// Performs pre-validation of RLP encoded transaction before other
/// processing: check length against `max_transaction_size` and decode the
/// RLP.
pub fn decode_transaction(&self, transaction: &[u8]) -> Result<UnverifiedTransaction, transaction::Error> {
let rlp = Rlp::new(&transaction);
if rlp.as_raw().len() > self.params().max_transaction_size {
Expand Down
12 changes: 8 additions & 4 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,7 @@ impl IoClient for Client {
fn queue_transactions(&self, transactions: Vec<Bytes>, peer_id: usize) {
trace_time!("queue_transactions");
let len = transactions.len();
self.queue_transactions.queue(&self.io_channel.read(), len, move |client| {
self.queue_transactions.enqueue(&self.io_channel.read(), len, move |client| {
trace_time!("import_queued_transactions");

let txs: Vec<UnverifiedTransaction> = transactions
Expand Down Expand Up @@ -2231,7 +2231,7 @@ impl IoClient for Client {

let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
self.queue_ancient_blocks.enqueue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
Expand Down Expand Up @@ -2265,7 +2265,7 @@ impl IoClient for Client {
}

fn queue_consensus_message(&self, message: Bytes) {
match self.queue_consensus_message.queue(&self.io_channel.read(), 1, move |client| {
match self.queue_consensus_message.enqueue(&self.io_channel.read(), 1, move |client| {
if let Err(e) = client.engine().handle_message(&message) {
debug!(target: "poa", "Invalid message received: {}", e);
}
Expand Down Expand Up @@ -2798,7 +2798,11 @@ impl IoChannelQueue {
}
}

pub fn queue<F>(&self, channel: &IoChannel<ClientIoMessage<Client>>, count: usize, fun: F) -> EthcoreResult<()> where
/// Try to to add an item to the queue for deferred processing by the IO
/// client. Messages take the form of `Fn` closures that carry a `Client`
/// reference with them. Enqueuing a message can fail if the queue is full
/// or if the `send()` on the `IoChannel` fails.
pub fn enqueue<F>(&self, channel: &IoChannel<ClientIoMessage<Client>>, count: usize, fun: F) -> EthcoreResult<()> where
Comment thread
niklasad1 marked this conversation as resolved.
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ impl BlockChainClient for TestBlockChainClient {
impl IoClient for TestBlockChainClient {
fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: usize) {
// import right here
let txs = transactions.into_iter().filter_map(|bytes| Rlp::new(&bytes).as_val().ok()).collect();
let txs = transactions.iter().filter_map(|bytes| Rlp::new(bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);
}

Expand Down
6 changes: 3 additions & 3 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ impl SyncHandler {
}

/// Called when peer sends us new transactions
pub fn on_peer_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), PacketDecodeError> {
pub fn on_peer_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: Rlp) -> Result<(), PacketDecodeError> {
// Accept transactions only when fully synced
if !io.is_chain_queue_empty() || (sync.state != SyncState::Idle && sync.state != SyncState::NewBlocks) {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
Expand All @@ -686,11 +686,11 @@ impl SyncHandler {
return Ok(());
}

let item_count = r.item_count()?;
let item_count = tx_rlp.item_count()?;
trace!(target: "sync", "{:02} -> Transactions ({} entries)", peer_id, item_count);
let mut transactions = Vec::with_capacity(item_count);
for i in 0 .. item_count {
let rlp = r.at(i)?;
let rlp = tx_rlp.at(i)?;
let tx = rlp.as_raw().to_vec();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to get rid of this copy here, but I can't make it work.

Copy link
Copy Markdown
Collaborator

@niklasad1 niklasad1 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is possible because rlpin::Rlp is essentially a wrapper over &[u8], so one way or another you need to allocate to get Vec<u8> in safe Rust.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my plan was to defer the allocation until we transform the rlp bytes into UnverifiedTransactions. But then the transform to UnverifiedTransaction has to happen outside the ClientIoMessage::Execute closure. And even then, that closure is an Fn and changing that to an FnOnce doesn't work because it can't be called when boxed and at that point I gave up.

transactions.push(tx);
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ impl ChainSync {
self.transactions_stats.stats()
}

/// Updates transactions were received by a peer
/// Updates the set of transactions recently sent to this peer to avoid spamming.
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
if let Some(peer_info) = self.peers.get_mut(&peer_id) {
peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash()));
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl SyncRequester {
let mut rlp = RlpStream::new_list(hashes.len());
trace!(target: "sync", "{} <- GetBlockBodies: {} entries starting from {:?}, set = {:?}", peer_id, hashes.len(), hashes.first(), set);
for h in &hashes {
rlp.append(&h.clone());
rlp.append(h);
}
SyncRequester::send_request(sync, io, peer_id, PeerAsking::BlockBodies, GetBlockBodiesPacket, rlp.out());
let peer = sync.peers.get_mut(&peer_id).expect("peer_id may originate either from on_packet, where it is already validated or from enumerating self.peers. qed");
Expand Down
2 changes: 1 addition & 1 deletion ethcore/sync/src/chain/supplier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl SyncSupplier {
TransactionsPacket => {
let res = {
let sync_ro = sync.read();
SyncHandler::on_peer_transactions(&*sync_ro, io, peer, &rlp)
SyncHandler::on_peer_transactions(&*sync_ro, io, peer, rlp)
};
if res.is_err() {
// peer sent invalid data, disconnect.
Expand Down
6 changes: 3 additions & 3 deletions ethcore/types/src/io_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ pub enum ClientIoMessage<C> {
FeedBlockChunk(H256, Bytes),
/// Take a snapshot for the block with given number.
TakeSnapshot(u64),
/// Execute wrapped closure
/// Execute wrapped Fn closure
Execute(Callback<C>),
}

impl<C> ClientIoMessage<C> {
/// Create new `ClientIoMessage` that executes given procedure.
/// Create new `ClientIoMessage` that can execute the wrapped Fn closure.
pub fn execute<F: Fn(&C) + Send + Sync + 'static>(fun: F) -> Self {
ClientIoMessage::Execute(Callback(Box::new(fun)))
}
}

/// A function to invoke in the client thread.
/// A wrapper around an Fn closure to invoke in the client thread.
pub struct Callback<C>(pub Box<dyn Fn(&C) + Send + Sync>);

impl<C> fmt::Debug for Callback<C> {
Expand Down