Skip to content

Commit

Permalink
fix(p2p): open peer before sending session ack
Browse files Browse the repository at this point in the history
This commit addresses an edge case bug whereby one peer finishes the
handshake before the other and begins sending regular packets. It is
possible for the other peer to attempt to process these regular packets
while it is executing asynchronous code to complete the handshake, which
results in the regular packets being discarded as "unsolicited."

The key change is marking the peer as `opened` after receiving and
authenticating the `SessionInitPacket`, as well as validating the peer
(not banned, not ourselves, not an incompatible version, etc...), but
before we send the `SessionAckPacket`. This ensures that we are ready
to accept packets before the peer is ready to send packets.

This involves restructuring the open flow, splitting it into two methods
(`beginOpen` and `completeOpen`). The first establishes the socket
connection and begins the handshake. The second marks the peer as
`opened` and compelts the handshake. In between we validate the peer,
and afterwards we perform the routine for newly opened peers including
sharing and asking for open orders.

Fixes #839.
  • Loading branch information
sangaman committed Mar 19, 2019
1 parent 07b5981 commit 2e6e1e8
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 162 deletions.
3 changes: 3 additions & 0 deletions lib/grpc/GrpcService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ class GrpcService {
case p2pErrorCodes.COULD_NOT_CONNECT:
code = status.UNAVAILABLE;
break;
case p2pErrorCodes.POOL_CLOSED:
code = status.ABORTED;
break;
}

// return a grpc error with the code if we've assigned one, otherwise pass along the caught error as UNKNOWN
Expand Down
181 changes: 100 additions & 81 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,20 @@ import { EventEmitter } from 'events';
import crypto from 'crypto';
import secp256k1 from 'secp256k1';
import stringify from 'json-stable-stringify';
import semver from 'semver';
import { ReputationEvent, DisconnectionReason, NetworkMagic } from '../constants/enums';
import Parser from './Parser';
import * as packets from './packets/types';
import Logger from '../Logger';
import { ms } from '../utils/utils';
import { OutgoingOrder } from '../orderbook/types';
import { Packet, PacketDirection, PacketType } from './packets';
import { NodeState, Address, NodeConnectionInfo, PoolConfig } from './types';
import { NodeState, Address, NodeConnectionInfo } from './types';
import errors, { errorCodes } from './errors';
import addressUtils from '../utils/addressUtils';
import NodeKey from '../nodekey/NodeKey';
import Network from './Network';
import Framer from './Framer';

const minCompatibleVersion: string = require('../../package.json').minCompatibleVersion;

/** Key info about a peer for display purposes */
type PeerInfo = {
address: string,
Expand All @@ -38,11 +35,9 @@ interface Peer {
on(event: 'pairDropped', listener: (pair: string) => void): this;
on(event: 'nodeStateUpdate', listener: () => void): this;
on(event: 'reputation', listener: (event: ReputationEvent) => void): this;
once(event: 'open', listener: () => void): this;
once(event: 'close', listener: () => void): this;
emit(event: 'connect'): boolean;
emit(event: 'reputation', reputationEvent: ReputationEvent): boolean;
emit(event: 'open'): boolean;
emit(event: 'close'): boolean;
emit(event: 'error', err: Error): boolean;
emit(event: 'packet', packet: Packet): boolean;
Expand All @@ -57,10 +52,13 @@ class Peer extends EventEmitter {
public recvDisconnectionReason?: DisconnectionReason;
public sentDisconnectionReason?: DisconnectionReason;
public expectedNodePubKey?: string;
public active = false; // added to peer list
/** Whether the peer is included in the p2p pool list of peers and will receive broadcasted packets. */
public active = false;
/** Timer to periodically call getNodes #402 */
public discoverTimer?: NodeJS.Timer;
/** Whether we have received and authenticated a [[SessionInitPacket]] from the peer. */
private opened = false;
private opening = false;
private socket?: Socket;
private parser: Parser;
private closed = false;
Expand Down Expand Up @@ -94,7 +92,7 @@ class Peer extends EventEmitter {
/** Connection retries max period. */
private static readonly CONNECTION_RETRIES_MAX_PERIOD = 604800000;

private get version(): string {
public get version(): string {
return this.nodeState ? this.nodeState.version : '';
}

Expand Down Expand Up @@ -141,7 +139,7 @@ class Peer extends EventEmitter {
/**
* @param address The socket address for the connection to this peer.
*/
constructor(private logger: Logger, public address: Address, private config: PoolConfig) {
constructor(private logger: Logger, public address: Address) {
super();

this.framer = new Framer(this.network);
Expand All @@ -152,8 +150,8 @@ class Peer extends EventEmitter {
/**
* Creates a Peer from an inbound socket connection.
*/
public static fromInbound = (socket: Socket, logger: Logger, config: PoolConfig): Peer => {
const peer = new Peer(logger, addressUtils.fromSocket(socket), config);
public static fromInbound = (socket: Socket, logger: Logger): Peer => {
const peer = new Peer(logger, addressUtils.fromSocket(socket));

peer.inbound = true;
peer.socket = socket;
Expand Down Expand Up @@ -183,64 +181,49 @@ class Peer extends EventEmitter {
}

/**
* Prepare a connection for use by ensuring it is active, exchanging [[HelloPacket]] with handshake data,
* and emit the `open` event if everything succeeds. Throw an error on unexpected handshake data.
* @param handshakeData our handshake data to send to the peer
* @param nodePubKey the expected nodePubKey of the node we are opening a connection with
* Prepares a peer for use by establishing a socket connection and beginning the handshake.
* @param ownNodeState our node state data to send to the peer
* @param nodeKey our identity node key
* @param expectedNodePubKey the expected nodePubKey of the node we are opening a connection with
* @param retryConnecting whether to retry to connect upon failure
* @returns the session init packet from beginning the handshake
*/
public open = async (ownNodeState: NodeState, nodeKey: NodeKey, expectedNodePubKey?: string, retryConnecting = false): Promise<void> => {
public beginOpen = async (ownNodeState: NodeState, nodeKey: NodeKey, expectedNodePubKey?: string, retryConnecting = false):
Promise<packets.SessionInitPacket> => {
assert(!this.opening);
assert(!this.opened);
assert(!this.closed);
assert(this.inbound || expectedNodePubKey);
assert(!retryConnecting || !this.inbound);

this.opening = true;
this.expectedNodePubKey = expectedNodePubKey;

await this.initConnection(retryConnecting);
this.initStall();

await this.handshake(ownNodeState, nodeKey);

if (this.expectedNodePubKey && this.nodePubKey !== this.expectedNodePubKey) {
await this.close(DisconnectionReason.UnexpectedIdentity);
throw errors.UNEXPECTED_NODE_PUB_KEY(this.nodePubKey!, this.expectedNodePubKey, addressUtils.toString(this.address));
}
return this.beginHandshake(ownNodeState, nodeKey);
}

if (this.nodePubKey === ownNodeState.nodePubKey) {
await this.close(DisconnectionReason.ConnectedToSelf);
throw errors.ATTEMPTED_CONNECTION_TO_SELF;
}
/**
* Finishes opening a peer for use by marking the peer as opened, completing the handshake,
* and setting up the ping packet timer.
* @param ownNodeState our node state data to send to the peer
* @param nodeKey our identity node key
* @param sessionInit the session init packet we received when beginning the handshake
*/
public completeOpen = async (ownNodeState: NodeState, nodeKey: NodeKey, sessionInit: packets.SessionInitPacket) => {
assert(this.opening);
assert(!this.opened);
assert(!this.closed);

// Check if version is semantic, and higher than minCompatibleVersion.
if (!semver.valid(this.version)) {
await this.close(DisconnectionReason.MalformedVersion);
throw errors.MALFORMED_VERSION(addressUtils.toString(this.address), this.version);
}
// dev.note: compare returns 0 if v1 == v2, or 1 if v1 is greater, or -1 if v2 is greater.
if (semver.compare(this.version, minCompatibleVersion) === -1) {
await this.close(DisconnectionReason.IncompatibleProtocolVersion);
throw errors.INCOMPATIBLE_VERSION(addressUtils.toString(this.address), minCompatibleVersion, this.version);
}
this.opening = false;
this.opened = true;

// request peer's known nodes only if p2p.discover option is true
if (this.config.discover) {
await this.sendPacket(new packets.GetNodesPacket());
if (this.config.discoverminutes === 0) {
// timer is disabled
this.discoverTimer = undefined; // defensive programming
} else {
// timer is enabled
this.discoverTimer = setInterval(this.sendGetNodes, this.config.discoverminutes * 1000 * 60);
}
}
await this.completeHandshake(ownNodeState, nodeKey, sessionInit);

// Setup the ping interval
this.pingTimer = setInterval(this.sendPing, Peer.PING_INTERVAL);

// let listeners know that this peer is ready to go
this.opened = true;
this.emit('open');
}

/**
Expand All @@ -252,6 +235,7 @@ class Peer extends EventEmitter {
}

this.closed = true;
this.opened = false;

if (this.socket) {
if (reason !== undefined) {
Expand Down Expand Up @@ -593,30 +577,28 @@ class Peer extends EventEmitter {
});
}

/** Check if a given packet is solicited and fulfill the pending response entry if it's a response. */
private isPacketSolicited = (packet: Packet): boolean => {
let solicited = true;

/** Checks if a given packet is solicited and fulfills the pending response entry if it's a response. */
private isPacketSolicited = async (packet: Packet): Promise<boolean> => {
if (!this.opened && packet.type !== PacketType.SessionInit && packet.type !== PacketType.SessionAck && packet.type !== PacketType.Disconnecting) {
// until the connection is opened, we only accept SessionInit/SessionAck packets
solicited = false;
// until the connection is opened, we only accept SessionInit, SessionAck, and Disconnecting packets
return false;
}
if (packet.direction === PacketDirection.Response) {
// lookup a pending response entry for this packet by its reqId
if (!this.fulfillResponseEntry(packet)) {
solicited = false;
return false;
}
}

return solicited;
return true;
}

private handlePacket = async (packet: Packet): Promise<void> => {
this.lastRecv = Date.now();
const sender = this.nodePubKey !== undefined ? this.nodePubKey : addressUtils.toString(this.address);
this.logger.trace(`Received ${PacketType[packet.type]} packet from ${sender}${JSON.stringify(packet)}`);

if (this.isPacketSolicited(packet)) {
if (await this.isPacketSolicited(packet)) {
switch (packet.type) {
case PacketType.SessionInit: {
this.handleSessionInit(packet);
Expand Down Expand Up @@ -656,21 +638,31 @@ class Peer extends EventEmitter {
}

/**
* Authenticate the identity of the peer through SessionInit packet
* @param {SessionInitPacket} packet
* @param {NodeKey} nodeKey
* Authenticates the identity of a peer with a [[SessionInitPacket]] and sets the peer's node state.
* Throws an error and closes the peer if authentication fails.
* @param packet the session init packet
* @param nodePubKey our node pub key
* @param expectedNodePubKey the expected node pub key of the sender of the init packet
*/
private authenticate = async (packet: packets.SessionInitPacket, nodeKey: NodeKey) => {
private authenticateSessionInit = async (packet: packets.SessionInitPacket, nodePubKey: string, expectedNodePubKey?: string) => {
const body = packet.body!;
const { sign, ...bodyWithoutSign } = body;
const { nodePubKey } = body.nodeState; // the peer pubkey
const { peerPubKey } = body; // our pubkey
/** The pub key of the node that sent the init packet. */
const sourceNodePubKey = body.nodeState.nodePubKey;
/** The pub key of the node that the init packet is intended for. */
const targetNodePubKey = body.peerPubKey;

// verify that msg was intended for us
if (peerPubKey !== nodeKey.nodePubKey) {
// verify that the init packet came from the expected node
if (expectedNodePubKey && expectedNodePubKey !== sourceNodePubKey) {
await this.close(DisconnectionReason.UnexpectedIdentity);
throw errors.UNEXPECTED_NODE_PUB_KEY(sourceNodePubKey, expectedNodePubKey, addressUtils.toString(this.address));
}

// verify that the init packet was intended for us
if (targetNodePubKey !== nodePubKey) {
this.emit('reputation', ReputationEvent.InvalidAuth);
await this.close(DisconnectionReason.AuthFailureInvalidTarget);
throw errors.AUTH_FAILURE_INVALID_TARGET(nodePubKey, peerPubKey);
throw errors.AUTH_FAILURE_INVALID_TARGET(sourceNodePubKey, targetNodePubKey);
}

// verify that the msg was signed by the peer
Expand All @@ -679,16 +671,22 @@ class Peer extends EventEmitter {
const verified = secp256k1.verify(
msgHash,
Buffer.from(sign, 'hex'),
Buffer.from(nodePubKey, 'hex'),
Buffer.from(sourceNodePubKey, 'hex'),
);

if (!verified) {
this.emit('reputation', ReputationEvent.InvalidAuth);
await this.close(DisconnectionReason.AuthFailureInvalidSignature);
throw errors.AUTH_FAILURE_INVALID_SIGNATURE(nodePubKey);
throw errors.AUTH_FAILURE_INVALID_SIGNATURE(sourceNodePubKey);
}

// finally set this peer's node state to the node state in the init packet body
this.nodeState = body.nodeState;
}

/**
* Sends a [[SessionInitPacket]] and waits for a [[SessionAckPacket]].
*/
private initSession = async (ownNodeState: NodeState, nodeKey: NodeKey, expectedNodePubKey: string): Promise<void> => {
const ECDH = crypto.createECDH('secp256k1');
const ephemeralPubKey = ECDH.generateKeys().toString('hex');
Expand All @@ -703,10 +701,10 @@ class Peer extends EventEmitter {
});
}

private ackSession = async (sessionInit: packets.SessionInitPacket, nodeKey: NodeKey): Promise<void> => {
await this.authenticate(sessionInit, nodeKey);
this.nodeState = sessionInit.body!.nodeState;

/**
* Sends a [[SessionAckPacket]] in response to a given [[SessionInitPacket]].
*/
private ackSession = async (sessionInit: packets.SessionInitPacket): Promise<void> => {
const ECDH = crypto.createECDH('secp256k1');
const ephemeralPubKey = ECDH.generateKeys().toString('hex');

Expand All @@ -718,17 +716,38 @@ class Peer extends EventEmitter {
this.setOutEncryption(key);
}

private handshake = async (ownNodeState: NodeState, nodeKey: NodeKey) => {
/**
* Begins the handshake by waiting for a [[SessionInitPacket]] as well as sending our own
* [[SessionInitPacket]] first if we are the outbound peer.
* @returns the session init packet we receive
*/
private beginHandshake = async (ownNodeState: NodeState, nodeKey: NodeKey) => {
let sessionInit: packets.SessionInitPacket;
if (!this.inbound) {
// outbound handshake
assert(this.expectedNodePubKey);
await this.initSession(ownNodeState, nodeKey, this.expectedNodePubKey!);
const sessionInit = await this.waitSessionInit();
await this.ackSession(sessionInit, nodeKey);
sessionInit = await this.waitSessionInit();
await this.authenticateSessionInit(sessionInit, nodeKey.nodePubKey, this.expectedNodePubKey);
} else {
// inbound handshake
sessionInit = await this.waitSessionInit();
await this.authenticateSessionInit(sessionInit, nodeKey.nodePubKey);
}
return sessionInit;
}

/**
* Completes the handshake by sending the [[SessionAckPacket]] and our [[SessionInitPacket]] if it
* has not been sent already, as is the case with inbound peers.
*/
private completeHandshake = async (ownNodeState: NodeState, nodeKey: NodeKey, sessionInit: packets.SessionInitPacket) => {
if (!this.inbound) {
// outbound handshake
await this.ackSession(sessionInit);
} else {
// inbound handshake
const sessionInit = await this.waitSessionInit();
await this.ackSession(sessionInit, nodeKey);
await this.ackSession(sessionInit);
await this.initSession(ownNodeState, nodeKey, sessionInit.body!.nodeState.nodePubKey);
}
}
Expand All @@ -738,7 +757,7 @@ class Peer extends EventEmitter {
await this.sendPacket(packet);
}

private sendGetNodes = async (): Promise<void> => {
public sendGetNodes = async (): Promise<void> => {
const packet = new packets.GetNodesPacket();
await this.sendPacket(packet);
}
Expand Down
Loading

0 comments on commit 2e6e1e8

Please sign in to comment.