From 4667ddf90530d2c2b2734b71c6f8b335d763891d Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Fri, 23 Apr 2021 13:41:16 +0200 Subject: [PATCH 1/2] client: fixed server/service concurrency leading to unreliably picking up bootstrap connections --- packages/client/lib/client.ts | 12 ++++++++++-- packages/client/lib/net/server/rlpxserver.ts | 5 ----- packages/client/lib/net/server/server.ts | 9 +++++++++ packages/client/lib/service/service.ts | 1 - 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/packages/client/lib/client.ts b/packages/client/lib/client.ts index 2b3ece88f08..7b69e81ec00 100644 --- a/packages/client/lib/client.ts +++ b/packages/client/lib/client.ts @@ -106,8 +106,16 @@ export default class EthereumClient extends events.EventEmitter { if (this.started) { return false } - await Promise.all(this.config.servers.map((s) => s.start())) - await Promise.all(this.services.map((s) => s.start())) + // eslint-disable-next-line @typescript-eslint/no-floating-promises + Promise.all(this.services.map((s) => s.start())) + .then(() => { + return Promise.all(this.config.servers.map((s) => s.start())) + }) + .then(() => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + Promise.all(this.config.servers.map((s) => s.bootstrap())) + }) + this.started = true } diff --git a/packages/client/lib/net/server/rlpxserver.ts b/packages/client/lib/net/server/rlpxserver.ts index c5bf5ac2f85..70544f5ab40 100644 --- a/packages/client/lib/net/server/rlpxserver.ts +++ b/packages/client/lib/net/server/rlpxserver.ts @@ -120,11 +120,6 @@ export class RlpxServer extends Server { this.initDpt() this.initRlpx() this.started = true - // Boostrapping is technically not needed for a server start - // (this is a repeated process) and setting `started` to `true` - // before allows other services to resolve earlier and makes - // the sync pick-up more reliable - await this.bootstrap() return true } diff --git a/packages/client/lib/net/server/server.ts b/packages/client/lib/net/server/server.ts index caa45ceda89..3b90adb877b 100644 --- a/packages/client/lib/net/server/server.ts +++ b/packages/client/lib/net/server/server.ts @@ -81,6 +81,15 @@ export class Server extends EventEmitter { return true } + /** + * Server bootstrap, on Libp2p this is currently done implicitly along the + * server start which might have negative implications for initial server + * connections due to concurrency reasons. + * + * TODO: extract Libp2p bootstrap from server.start() + */ + async bootstrap(): Promise {} + /** * Stop server. Returns a promise that resolves once server has been stopped. */ diff --git a/packages/client/lib/service/service.ts b/packages/client/lib/service/service.ts index 1c90674c5ee..ea8d9679a0a 100644 --- a/packages/client/lib/service/service.ts +++ b/packages/client/lib/service/service.ts @@ -109,7 +109,6 @@ export class Service extends events.EventEmitter { if (this.running) { return false } - await Promise.all(this.config.servers.map((s) => s.start())) this.running = true this.config.logger.info(`Started ${this.name} service.`) } From 0297fd5093d21a0831ddab9d90e2471c4e2e9b42 Mon Sep 17 00:00:00 2001 From: Ryan Ghods Date: Sun, 25 Apr 2021 19:54:31 -0700 Subject: [PATCH 2/2] * await rlpx init (fixes init race condition) * fix tests * simplify promises * improve libp2p boostrap comment --- packages/client/lib/client.ts | 13 +- packages/client/lib/net/server/rlpxserver.ts | 119 +++++++++--------- packages/client/lib/net/server/server.ts | 7 +- packages/client/test/client.spec.ts | 3 + packages/client/test/integration/util.ts | 1 + .../client/test/net/server/rlpxserver.spec.ts | 13 +- .../test/service/fullethereumservice.spec.ts | 2 - .../test/service/lightethereumservice.spec.ts | 1 - 8 files changed, 79 insertions(+), 80 deletions(-) diff --git a/packages/client/lib/client.ts b/packages/client/lib/client.ts index 7b69e81ec00..a9d99800ffa 100644 --- a/packages/client/lib/client.ts +++ b/packages/client/lib/client.ts @@ -106,16 +106,9 @@ export default class EthereumClient extends events.EventEmitter { if (this.started) { return false } - // eslint-disable-next-line @typescript-eslint/no-floating-promises - Promise.all(this.services.map((s) => s.start())) - .then(() => { - return Promise.all(this.config.servers.map((s) => s.start())) - }) - .then(() => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - Promise.all(this.config.servers.map((s) => s.bootstrap())) - }) - + await Promise.all(this.services.map((s) => s.start())) + await Promise.all(this.config.servers.map((s) => s.start())) + await Promise.all(this.config.servers.map((s) => s.bootstrap())) this.started = true } diff --git a/packages/client/lib/net/server/rlpxserver.ts b/packages/client/lib/net/server/rlpxserver.ts index 70544f5ab40..eb1091110c2 100644 --- a/packages/client/lib/net/server/rlpxserver.ts +++ b/packages/client/lib/net/server/rlpxserver.ts @@ -118,7 +118,7 @@ export class RlpxServer extends Server { } await super.start() this.initDpt() - this.initRlpx() + await this.initRlpx() this.started = true return true @@ -231,70 +231,73 @@ export class RlpxServer extends Server { * Initializes RLPx instance for peer management * @private */ - initRlpx() { - this.rlpx = new Devp2pRLPx(this.key, { - dpt: this.dpt!, - maxPeers: this.config.maxPeers, - capabilities: RlpxPeer.capabilities(Array.from(this.protocols)), - remoteClientIdFilter: this.clientFilter, - listenPort: this.config.port, - common: this.config.chainCommon, - }) + async initRlpx() { + return new Promise((resolve) => { + this.rlpx = new Devp2pRLPx(this.key, { + dpt: this.dpt!, + maxPeers: this.config.maxPeers, + capabilities: RlpxPeer.capabilities(Array.from(this.protocols)), + remoteClientIdFilter: this.clientFilter, + listenPort: this.config.port, + common: this.config.chainCommon, + }) - this.rlpx.on('peer:added', async (rlpxPeer: Devp2pRLPxPeer) => { - const peer = new RlpxPeer({ - config: this.config, - id: rlpxPeer.getId()!.toString('hex'), - host: rlpxPeer._socket.remoteAddress!, - port: rlpxPeer._socket.remotePort!, - protocols: Array.from(this.protocols), - // @ts-ignore: Property 'server' does not exist on type 'Socket'. - // TODO: check this error - inbound: !!rlpxPeer._socket.server, + this.rlpx.on('peer:added', async (rlpxPeer: Devp2pRLPxPeer) => { + const peer = new RlpxPeer({ + config: this.config, + id: rlpxPeer.getId()!.toString('hex'), + host: rlpxPeer._socket.remoteAddress!, + port: rlpxPeer._socket.remotePort!, + protocols: Array.from(this.protocols), + // @ts-ignore: Property 'server' does not exist on type 'Socket'. + // TODO: check this error + inbound: !!rlpxPeer._socket.server, + }) + try { + await peer.accept(rlpxPeer, this) + this.peers.set(peer.id, peer) + this.config.logger.debug(`Peer connected: ${peer}`) + this.emit('connected', peer) + } catch (error) { + this.error(error) + } }) - try { - await peer.accept(rlpxPeer, this) - this.peers.set(peer.id, peer) - this.config.logger.debug(`Peer connected: ${peer}`) - this.emit('connected', peer) - } catch (error) { - this.error(error) - } - }) - this.rlpx.on('peer:removed', (rlpxPeer: Devp2pRLPxPeer, reason: any) => { - const id = (rlpxPeer.getId() as Buffer).toString('hex') - const peer = this.peers.get(id) - if (peer) { - this.peers.delete(peer.id) - this.config.logger.debug( - `Peer disconnected (${rlpxPeer.getDisconnectPrefix(reason)}): ${peer}` - ) - this.emit('disconnected', peer) - } - }) + this.rlpx.on('peer:removed', (rlpxPeer: Devp2pRLPxPeer, reason: any) => { + const id = (rlpxPeer.getId() as Buffer).toString('hex') + const peer = this.peers.get(id) + if (peer) { + this.peers.delete(peer.id) + this.config.logger.debug( + `Peer disconnected (${rlpxPeer.getDisconnectPrefix(reason)}): ${peer}` + ) + this.emit('disconnected', peer) + } + }) - this.rlpx.on('peer:error', (rlpxPeer: any, error: Error) => { - const peerId = rlpxPeer && rlpxPeer.getId() - if (!peerId) { - return this.error(error) - } - const id = peerId.toString('hex') - const peer = this.peers.get(id) - this.error(error, peer) - }) + this.rlpx.on('peer:error', (rlpxPeer: any, error: Error) => { + const peerId = rlpxPeer && rlpxPeer.getId() + if (!peerId) { + return this.error(error) + } + const id = peerId.toString('hex') + const peer = this.peers.get(id) + this.error(error, peer) + }) - this.rlpx.on('error', (e: Error) => this.error(e)) + this.rlpx.on('error', (e: Error) => this.error(e)) - this.rlpx.on('listening', () => { - this.emit('listening', { - transport: this.name, - url: this.getRlpxInfo().enode, + this.rlpx.on('listening', () => { + this.emit('listening', { + transport: this.name, + url: this.getRlpxInfo().enode, + }) + resolve() }) - }) - if (this.config.port) { - this.rlpx.listen(this.config.port, '0.0.0.0') - } + if (this.config.port) { + this.rlpx.listen(this.config.port, '0.0.0.0') + } + }) } } diff --git a/packages/client/lib/net/server/server.ts b/packages/client/lib/net/server/server.ts index 3b90adb877b..94d3e89f8ea 100644 --- a/packages/client/lib/net/server/server.ts +++ b/packages/client/lib/net/server/server.ts @@ -82,11 +82,8 @@ export class Server extends EventEmitter { } /** - * Server bootstrap, on Libp2p this is currently done implicitly along the - * server start which might have negative implications for initial server - * connections due to concurrency reasons. - * - * TODO: extract Libp2p bootstrap from server.start() + * Server bootstrap. + * In Libp2p this is done during server start. */ async bootstrap(): Promise {} diff --git a/packages/client/test/client.spec.ts b/packages/client/test/client.spec.ts index 3d0e93ee88c..a3b25a7b94b 100644 --- a/packages/client/test/client.spec.ts +++ b/packages/client/test/client.spec.ts @@ -25,13 +25,16 @@ tape('[EthereumClient]', async (t) => { open() {} start() {} stop() {} + bootstrap() {} } Server.prototype.open = td.func() Server.prototype.start = td.func() Server.prototype.stop = td.func() + Server.prototype.bootstrap = td.func() td.replace('../lib/net/server/server', { Server }) td.when(Server.prototype.start()).thenResolve() td.when(Server.prototype.stop()).thenResolve() + td.when(Server.prototype.bootstrap()).thenResolve() const { default: EthereumClient } = await import('../lib/client') diff --git a/packages/client/test/integration/util.ts b/packages/client/test/integration/util.ts index 106c7197674..923d75acc9a 100644 --- a/packages/client/test/integration/util.ts +++ b/packages/client/test/integration/util.ts @@ -49,6 +49,7 @@ export async function setup( service.synchronizer.execution.syncing = false } await service.start() + await server.start() return [server, service] } diff --git a/packages/client/test/net/server/rlpxserver.spec.ts b/packages/client/test/net/server/rlpxserver.spec.ts index 2d476ea7bae..b0fe171b0c9 100644 --- a/packages/client/test/net/server/rlpxserver.spec.ts +++ b/packages/client/test/net/server/rlpxserver.spec.ts @@ -101,6 +101,7 @@ tape('[RlpxServer]', async (t) => { server.dpt = td.object() td.when(server.dpt!.getDnsPeers()).thenResolve([dnsPeerInfo]) await server.start() + await server.bootstrap() td.verify(server.dpt!.bootstrap(dnsPeerInfo)) await server.stop() t.end() @@ -184,14 +185,16 @@ tape('[RlpxServer]', async (t) => { ;(server.dpt as any).emit('error', 'err0') }) - t.test('should init rlpx', (t) => { + t.test('should init rlpx', async (t) => { t.plan(4) const config = new Config({ loglevel: 'error', transports: [] }) const server = new RlpxServer({ config }) const rlpxPeer = new RlpxPeer() td.when(rlpxPeer.getId()).thenReturn(Buffer.from([1])) td.when(RlpxPeer.prototype.accept(rlpxPeer, td.matchers.isA(RlpxServer))).thenResolve() - server.initRlpx() + server.initRlpx().catch((error) => { + throw error + }) td.verify(RlpxPeer.capabilities(Array.from((server as any).protocols))) td.verify(server.rlpx!.listen(server.config.port, '0.0.0.0')) server.on('connected', (peer: any) => t.ok(peer instanceof RlpxPeer, 'connected')) @@ -208,14 +211,16 @@ tape('[RlpxServer]', async (t) => { server.rlpx!.emit('listening') }) - t.test('should handles errors from id-less peers', (t) => { + t.test('should handles errors from id-less peers', async (t) => { t.plan(1) const config = new Config({ loglevel: 'error', transports: [] }) const server = new RlpxServer({ config }) const rlpxPeer = new RlpxPeer() td.when(rlpxPeer.getId()).thenReturn(Buffer.from('test')) td.when(RlpxPeer.prototype.accept(rlpxPeer, td.matchers.isA(RlpxServer))).thenResolve() - server.initRlpx() + server.initRlpx().catch((error) => { + throw error + }) server.on('error', (err: any) => t.equals(err.message, 'err0', 'got error')) server.rlpx!.emit('peer:error', rlpxPeer, new Error('err0')) }) diff --git a/packages/client/test/service/fullethereumservice.spec.ts b/packages/client/test/service/fullethereumservice.spec.ts index dad1abd0359..e590ec1a28f 100644 --- a/packages/client/test/service/fullethereumservice.spec.ts +++ b/packages/client/test/service/fullethereumservice.spec.ts @@ -80,9 +80,7 @@ tape('[FullEthereumService]', async (t) => { t.notOk(await service.start(), 'already started') await service.stop() td.verify(service.synchronizer.stop()) - td.verify(server.start()) t.notOk(await service.stop(), 'already stopped') - await server.stop() t.end() }) diff --git a/packages/client/test/service/lightethereumservice.spec.ts b/packages/client/test/service/lightethereumservice.spec.ts index 5c02f168d0f..d7f98c43557 100644 --- a/packages/client/test/service/lightethereumservice.spec.ts +++ b/packages/client/test/service/lightethereumservice.spec.ts @@ -72,7 +72,6 @@ tape('[LightEthereumService]', async (t) => { t.notOk(await service.start(), 'already started') await service.stop() td.verify(service.synchronizer.stop()) - td.verify(server.start()) t.notOk(await service.stop(), 'already stopped') t.end() })