Skip to content

Commit

Permalink
fix(p2p): remove error event from Peer (#1299)
Browse files Browse the repository at this point in the history
fix(p2p): remove error event from Peer
  • Loading branch information
sangaman authored Oct 17, 2019
2 parents 551f1a3 + 8927a09 commit 086457e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 68 deletions.
75 changes: 26 additions & 49 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type PeerInfo = {

interface Peer {
on(event: 'packet', listener: (packet: Packet) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
on(event: 'reputation', listener: (event: ReputationEvent) => void): this;
/** Adds a listener to be called when the peer's advertised but inactive pairs should be verified. */
on(event: 'verifyPairs', listener: () => void): this;
Expand All @@ -44,7 +43,6 @@ interface Peer {
emit(event: 'connect'): boolean;
emit(event: 'reputation', reputationEvent: ReputationEvent): boolean;
emit(event: 'close'): boolean;
emit(event: 'error', err: Error): boolean;
emit(event: 'packet', packet: Packet): boolean;
/** Notifies listeners that the peer's advertised but inactive pairs should be verified. */
emit(event: 'verifyPairs'): boolean;
Expand Down Expand Up @@ -246,8 +244,7 @@ class Peer extends EventEmitter {
expectedNodePubKey?: string,
/** Whether to retry to connect upon failure. */
retryConnecting?: boolean,
}):
Promise<packets.SessionInitPacket> => {
}): Promise<packets.SessionInitPacket> => {
assert(!this.opening);
assert(!this.opened);
assert(!this.closed);
Expand Down Expand Up @@ -300,13 +297,13 @@ class Peer extends EventEmitter {
this.opened = false;

if (this.socket) {
if (reason !== undefined) {
this.logger.debug(`Peer (${ this.label }): closing socket. reason: ${DisconnectionReason[reason]}`);
this.sentDisconnectionReason = reason;
await this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload }));
}

if (!this.socket.destroyed) {
if (reason !== undefined) {
this.logger.debug(`Peer (${this.label}): closing socket. reason: ${DisconnectionReason[reason]}`);
this.sentDisconnectionReason = reason;
await this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload }));
}

this.socket.destroy();
}
delete this.socket;
Expand Down Expand Up @@ -359,13 +356,20 @@ class Peer extends EventEmitter {
}

public sendPacket = async (packet: Packet): Promise<void> => {
const data = await this.framer.frame(packet, this.outEncryptionKey);
this.sendRaw(data);

this.logger.trace(`Sent ${PacketType[packet.type]} packet to ${this.label}: ${JSON.stringify(packet)}`);
if (this.socket && !this.socket.destroyed) {
const data = await this.framer.frame(packet, this.outEncryptionKey);
try {
this.socket.write(data);
this.logger.trace(`Sent ${PacketType[packet.type]} packet to ${this.label}: ${JSON.stringify(packet)}`);

if (packet.direction === PacketDirection.Request) {
this.addResponseTimeout(packet.header.id, packet.responseType, Peer.RESPONSE_TIMEOUT);
if (packet.direction === PacketDirection.Request) {
this.addResponseTimeout(packet.header.id, packet.responseType, Peer.RESPONSE_TIMEOUT);
}
} catch (err) {
this.logger.error(`failed sending data to ${this.label}`, err);
}
} else {
this.logger.trace(`could not send ${PacketType[packet.type]} packet to ${this.label}: ${JSON.stringify(packet)}`);
}
}

Expand Down Expand Up @@ -438,16 +442,6 @@ class Peer extends EventEmitter {
return;
}

private sendRaw = (data: Buffer) => {
if (this.socket && !this.socket.destroyed) {
try {
this.socket.write(data);
} catch (err) {
this.logger.error(`failed sending data to ${this.label}`, err);
}
}
}

/**
* Ensure we are connected (for inbound connections) or listen for the `connect` socket event (for outbound connections)
* and set the [[connectTime]] timestamp. If an outbound connection attempt errors or times out, throw an error.
Expand Down Expand Up @@ -580,7 +574,7 @@ class Peer extends EventEmitter {
if (now > entry.timeout) {
const request = PacketType[parseInt(packetId, 10)] || packetId;
const err = errors.RESPONSE_TIMEOUT(request);
this.emitError(err.message);
this.logger.error(`Peer timed out waiting for response to packet ${packetId}`);
entry.reject(err);
await this.close(DisconnectionReason.ResponseStalling, packetId);
}
Expand All @@ -590,15 +584,11 @@ class Peer extends EventEmitter {
/**
* Wait for a packet to be received from peer.
*/
private addResponseTimeout = (reqId: string, resType: ResponseType, timeout: number): PendingResponseEntry | undefined => {
if (this.closed) {
return undefined;
private addResponseTimeout = (reqId: string, resType: ResponseType, timeout: number) => {
if (!this.closed) {
const entry = this.getOrAddPendingResponseEntry(reqId, resType);
entry.setTimeout(timeout);
}

const entry = this.getOrAddPendingResponseEntry(reqId, resType);
entry.setTimeout(timeout);

return entry;
}

private getOrAddPendingResponseEntry = (reqId: string, resType: ResponseType): PendingResponseEntry => {
Expand Down Expand Up @@ -656,8 +646,7 @@ class Peer extends EventEmitter {
assert(this.socket);

this.socket!.once('error', (err) => {
this.emitError(err);
// socket close event will be called immediately after the socket error
this.logger.error(`Peer (${this.label}) error`, err);
});

this.socket!.once('close', async (hadError) => {
Expand Down Expand Up @@ -748,18 +737,6 @@ class Peer extends EventEmitter {
}
}

private emitError = (err: Error | string): void => {
if (this.closed) {
return;
}

if (err instanceof Error) {
this.emit('error', err);
} else {
this.emit('error', new Error(err));
}
}

/**
* Authenticates the identity of a peer with a [[SessionInitPacket]] and sets the peer's node state.
* Throws an error and closes the peer if authentication fails.
Expand Down
30 changes: 11 additions & 19 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class Pool extends EventEmitter {
* @param retryConnecting whether to attempt retry connecting, defaults to false
* @returns a promise that will resolve when all outbound connections resolve
*/
private connectNodes = (nodes: NodeConnectionIterator, allowKnown = true, retryConnecting = false) => {
private connectNodes = async (nodes: NodeConnectionIterator, allowKnown = true, retryConnecting = false) => {
const connectionPromises: Promise<void>[] = [];
nodes.forEach((node) => {
// check that this node is not ourselves
Expand All @@ -321,7 +321,7 @@ class Pool extends EventEmitter {
connectionPromises.push(this.tryConnectNode(node, retryConnecting));
}
});
return Promise.all(connectionPromises);
await Promise.all(connectionPromises);
}

/**
Expand Down Expand Up @@ -424,6 +424,8 @@ class Pool extends EventEmitter {
}

const peer = new Peer(this.logger, address, this.network);
this.bindPeer(peer);

this.pendingOutboundPeers.set(nodePubKey, peer);
await this.openPeer(peer, nodePubKey, retryConnecting);
return peer;
Expand Down Expand Up @@ -470,7 +472,6 @@ class Pool extends EventEmitter {
// if we are disconnected or disconnecting, don't open new connections
throw errors.POOL_CLOSED;
}
this.bindPeer(peer);

try {
const sessionInit = await peer.beginOpen({
Expand Down Expand Up @@ -648,6 +649,7 @@ class Pool extends EventEmitter {

private addInbound = async (socket: Socket) => {
const peer = Peer.fromInbound(socket, this.logger, this.network);
this.bindPeer(peer);
this.pendingInboundPeers.add(peer);
await this.tryOpenPeer(peer);
this.pendingInboundPeers.delete(peer);
Expand Down Expand Up @@ -820,9 +822,7 @@ class Pool extends EventEmitter {
this.logger.error(err);
});

this.server!.on('connection', async (socket) => {
await this.handleSocket(socket);
});
this.server!.on('connection', this.handleSocket);
}

private bindPeer = (peer: Peer) => {
Expand All @@ -846,14 +846,6 @@ class Pool extends EventEmitter {
this.emit('peer.nodeStateUpdate', peer);
});

peer.on('error', (err) => {
// The only situation in which the node should be connected to itself is the
// reachability check of the advertised addresses and we don't have to log that
if (peer.nodePubKey !== this.nodePubKey) {
this.logger.error(`Peer (${peer.label}): error: ${err.message}`);
}
});

peer.once('close', () => this.handlePeerClose(peer));

peer.on('reputation', async (event) => {
Expand All @@ -869,17 +861,17 @@ class Pool extends EventEmitter {
this.pendingOutboundPeers.delete(peer.expectedNodePubKey);
}

if (!peer.active) {
return;
}

if (peer.nodePubKey) {
this.pendingOutboundPeers.delete(peer.nodePubKey);
this.peers.delete(peer.nodePubKey);
}
this.emit('peer.close', peer.nodePubKey);
peer.removeAllListeners();

if (!peer.active) {
return;
}
peer.active = false;
this.emit('peer.close', peer.nodePubKey);

const shouldReconnect =
(peer.sentDisconnectionReason === undefined || peer.sentDisconnectionReason === DisconnectionReason.ResponseStalling) &&
Expand Down

0 comments on commit 086457e

Please sign in to comment.