Skip to content

Commit

Permalink
Merge pull request #1120 from ethereum/fix/rc-active-peers
Browse files Browse the repository at this point in the history
Fix/rc active peers
  • Loading branch information
mkalinin authored Jul 6, 2018
2 parents 5913b62 + 051fd73 commit f93d484
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public void setChannel(Channel channel) {
}

public void sendMessage(Message msg) {
if (channel.isDisconnected()) {
logger.warn("{}: attempt to send [{}] message after disconnect", channel, msg.getCommand().name());
return;
}

if (msg instanceof PingMessage) {
if (hasPing) return;
hasPing = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,44 +154,48 @@ public Set<String> nodesInUse() {
}

private void processNewPeers() {
if (newPeers.isEmpty()) return;
List<Runnable> noLockTasks = new ArrayList<>();

List<Channel> processed = new ArrayList<>();
synchronized (this) {
if (newPeers.isEmpty()) return;

int addCnt = 0;
for(Channel peer : newPeers) {
List<Channel> processed = new ArrayList<>();
int addCnt = 0;
for (Channel peer : newPeers) {

logger.debug("Processing new peer: " + peer);
logger.debug("Processing new peer: " + peer);

if(peer.isProtocolsInitialized()) {
if (peer.isProtocolsInitialized()) {

logger.debug("Protocols initialized");
logger.debug("Protocols initialized");

if (!activePeers.containsKey(peer.getNodeIdWrapper())) {
if (!peer.isActive() &&
activePeers.size() >= maxActivePeers &&
!trustedPeers.accept(peer.getNode())) {
if (!activePeers.containsKey(peer.getNodeIdWrapper())) {
if (!peer.isActive() &&
activePeers.size() >= maxActivePeers &&
!trustedPeers.accept(peer.getNode())) {

// restricting inbound connections unless this is a trusted peer
// restricting inbound connections unless this is a trusted peer

disconnect(peer, TOO_MANY_PEERS);
noLockTasks.add(() -> disconnect(peer, TOO_MANY_PEERS));
} else {
process(peer);
}
} else {
process(peer);
addCnt++;
noLockTasks.add(() -> disconnect(peer, DUPLICATE_PEER));
}
} else {
disconnect(peer, DUPLICATE_PEER);

processed.add(peer);
}
}

processed.add(peer);
if (addCnt > 0) {
logger.info("New peers processed: " + processed + ", active peers added: " + addCnt + ", total active peers: " + activePeers.size());
}
}

if (addCnt > 0) {
logger.info("New peers processed: " + processed + ", active peers added: " + addCnt + ", total active peers: " + activePeers.size());
newPeers.removeAll(processed);
}

newPeers.removeAll(processed);
noLockTasks.forEach(Runnable::run);
}

public void disconnect(Channel peer, ReasonCode reason) {
Expand Down Expand Up @@ -339,7 +343,7 @@ private void sendNewBlock(Block block, Channel receivedFrom) {
}
}

public void add(Channel peer) {
public synchronized void add(Channel peer) {
logger.debug("New peer in ChannelManager {}", peer);
newPeers.add(peer);
}
Expand All @@ -348,8 +352,10 @@ public void notifyDisconnect(Channel channel) {
logger.debug("Peer {}: notifies about disconnect", channel);
channel.onDisconnect();
syncPool.onDisconnect(channel);
activePeers.values().remove(channel);
newPeers.remove(channel);
synchronized(this) {
activePeers.values().remove(channel);
newPeers.remove(channel);
}
}

public void onSyncDone(boolean done) {
Expand Down

0 comments on commit f93d484

Please sign in to comment.