Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions ethcore/node-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,45 @@ extern crate tempdir;
#[macro_use]
extern crate log;

use std::collections::{HashMap, VecDeque};
use std::sync::Weak;

use ethcore::client::{BlockChainClient, BlockId};
use ethcore::client::{BlockChainClient, BlockId, ChainNotify, NewBlocks};

use ethereum_types::{H256, Address};
use ethabi::FunctionOutputDecoder;
use network::{ConnectionFilter, ConnectionDirection};
use devp2p::NodeId;
use devp2p::MAX_NODES_IN_TABLE;
use parking_lot::RwLock;

use_contract!(peer_set, "res/peer_set.json");

/// Connection filter that uses a contract to manage permissions.
pub struct NodeFilter {
client: Weak<BlockChainClient>,
contract_address: Address,
cache: RwLock<Cache>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we understand the real reason for CPU overload (not limited Node table), can we just rollback #10143 and return previous LRUCache, that was used here? I frankly don't see the advantages of proposed solution in comparison with the previous one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of implementing it via RwLock was to make it possible to read the cache from different threads without blocking each other.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Than may be just change previous Mutex to RwLock? Because order field in Cache is just the implementation of Lru

Copy link
Copy Markdown
Contributor Author

@VladLupashevskyi VladLupashevskyi Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! I think that makes sense. Sorry I didn't notice and I was thinking that LruCache comes already with built-in Mutex 😅

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I cannot really use LruCache since it does not provide non-mutable get method, so it's not possible to use RwLock with it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

struct Cache {
cache: HashMap<NodeId, bool>,
order: VecDeque<NodeId>
}

// Increase cache size due to possible reserved peers, which do not count in the node table size
pub const CACHE_SIZE: usize = MAX_NODES_IN_TABLE + 1024;
Comment thread
VladLupashevskyi marked this conversation as resolved.

impl NodeFilter {
/// Create a new instance. Accepts a contract address.
pub fn new(client: Weak<BlockChainClient>, contract_address: Address) -> NodeFilter {
NodeFilter {
client,
contract_address,
cache: RwLock::new(Cache{
cache: HashMap::with_capacity(CACHE_SIZE),
order: VecDeque::with_capacity(CACHE_SIZE)
})
}
}
}
Expand All @@ -70,6 +87,10 @@ impl ConnectionFilter for NodeFilter {
None => return false,
};

if let Some(allowed) = self.cache.read().cache.get(connecting_id) {
return *allowed;
}

let address = self.contract_address;
let own_low = H256::from_slice(&own_id[0..32]);
let own_high = H256::from_slice(&own_id[32..64]);
Expand All @@ -83,11 +104,26 @@ impl ConnectionFilter for NodeFilter {
debug!("Error callling peer set contract: {:?}", e);
false
});

let mut cache = self.cache.write();
if cache.cache.len() == CACHE_SIZE {
let poped = cache.order.pop_front().unwrap();
cache.cache.remove(&poped).is_none();
};
if cache.cache.insert(*connecting_id, allowed).is_none() {
cache.order.push_back(*connecting_id);
}
allowed
}
}

impl ChainNotify for NodeFilter {
fn new_blocks(&self, _new_blocks: NewBlocks) {
let mut cache = self.cache.write();
cache.cache.clear();
cache.order.clear();
}
}

#[cfg(test)]
mod test {
use std::sync::{Arc, Weak};
Expand Down
4 changes: 3 additions & 1 deletion parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,9 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
let private_tx_provider = private_tx_service.provider();
let connection_filter = connection_filter_address.map(|a| Arc::new(NodeFilter::new(Arc::downgrade(&client) as Weak<BlockChainClient>, a)));
let snapshot_service = service.snapshot_service();

if let Some(filter) = connection_filter.clone() {
service.add_notify(filter.clone());
}
// initialize the local node information store.
let store = {
let db = service.db();
Expand Down
4 changes: 2 additions & 2 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ impl Host {

let socket = {
let address = {
let mut nodes = self.nodes.write();
if let Some(node) = nodes.get_mut(id) {
let mut nodes = self.nodes.read();
if let Some(node) = nodes.get(id) {
node.endpoint.address
} else {
debug!(target: "network", "Connection to expired node aborted");
Expand Down
2 changes: 1 addition & 1 deletion util/network-devp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,6 @@ pub use service::NetworkService;
pub use host::NetworkContext;

pub use io::TimerToken;
pub use node_table::{validate_node_url, NodeId};
pub use node_table::{validate_node_url, NodeId, MAX_NODES_IN_TABLE};

const PROTOCOL_VERSION: u32 = 5;
Loading