Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/client/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ export default class EthereumClient extends events.EventEmitter {
if (this.started) {
return false
}
await Promise.all(this.config.servers.map((s) => s.start()))
await Promise.all(this.services.map((s) => s.start()))
await Promise.all(this.config.servers.map((s) => s.start()))
await Promise.all(this.config.servers.map((s) => s.bootstrap()))
this.started = true
}

Expand Down
124 changes: 61 additions & 63 deletions packages/client/lib/net/server/rlpxserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,8 @@ export class RlpxServer extends Server {
}
await super.start()
this.initDpt()
this.initRlpx()
await this.initRlpx()
this.started = true
// Boostrapping is technically not needed for a server start
// (this is a repeated process) and setting `started` to `true`
// before allows other services to resolve earlier and makes
// the sync pick-up more reliable
await this.bootstrap()

return true
}
Expand Down Expand Up @@ -236,70 +231,73 @@ export class RlpxServer extends Server {
* Initializes RLPx instance for peer management
* @private
*/
initRlpx() {
this.rlpx = new Devp2pRLPx(this.key, {
dpt: this.dpt!,
maxPeers: this.config.maxPeers,
capabilities: RlpxPeer.capabilities(Array.from(this.protocols)),
remoteClientIdFilter: this.clientFilter,
listenPort: this.config.port,
common: this.config.chainCommon,
})
async initRlpx() {
return new Promise<void>((resolve) => {
this.rlpx = new Devp2pRLPx(this.key, {
dpt: this.dpt!,
maxPeers: this.config.maxPeers,
capabilities: RlpxPeer.capabilities(Array.from(this.protocols)),
remoteClientIdFilter: this.clientFilter,
listenPort: this.config.port,
common: this.config.chainCommon,
})

this.rlpx.on('peer:added', async (rlpxPeer: Devp2pRLPxPeer) => {
const peer = new RlpxPeer({
config: this.config,
id: rlpxPeer.getId()!.toString('hex'),
host: rlpxPeer._socket.remoteAddress!,
port: rlpxPeer._socket.remotePort!,
protocols: Array.from(this.protocols),
// @ts-ignore: Property 'server' does not exist on type 'Socket'.
// TODO: check this error
inbound: !!rlpxPeer._socket.server,
this.rlpx.on('peer:added', async (rlpxPeer: Devp2pRLPxPeer) => {
const peer = new RlpxPeer({
config: this.config,
id: rlpxPeer.getId()!.toString('hex'),
host: rlpxPeer._socket.remoteAddress!,
port: rlpxPeer._socket.remotePort!,
protocols: Array.from(this.protocols),
// @ts-ignore: Property 'server' does not exist on type 'Socket'.
// TODO: check this error
inbound: !!rlpxPeer._socket.server,
})
try {
await peer.accept(rlpxPeer, this)
this.peers.set(peer.id, peer)
this.config.logger.debug(`Peer connected: ${peer}`)
this.emit('connected', peer)
} catch (error) {
this.error(error)
}
})
try {
await peer.accept(rlpxPeer, this)
this.peers.set(peer.id, peer)
this.config.logger.debug(`Peer connected: ${peer}`)
this.emit('connected', peer)
} catch (error) {
this.error(error)
}
})

this.rlpx.on('peer:removed', (rlpxPeer: Devp2pRLPxPeer, reason: any) => {
const id = (rlpxPeer.getId() as Buffer).toString('hex')
const peer = this.peers.get(id)
if (peer) {
this.peers.delete(peer.id)
this.config.logger.debug(
`Peer disconnected (${rlpxPeer.getDisconnectPrefix(reason)}): ${peer}`
)
this.emit('disconnected', peer)
}
})
this.rlpx.on('peer:removed', (rlpxPeer: Devp2pRLPxPeer, reason: any) => {
const id = (rlpxPeer.getId() as Buffer).toString('hex')
const peer = this.peers.get(id)
if (peer) {
this.peers.delete(peer.id)
this.config.logger.debug(
`Peer disconnected (${rlpxPeer.getDisconnectPrefix(reason)}): ${peer}`
)
this.emit('disconnected', peer)
}
})

this.rlpx.on('peer:error', (rlpxPeer: any, error: Error) => {
const peerId = rlpxPeer && rlpxPeer.getId()
if (!peerId) {
return this.error(error)
}
const id = peerId.toString('hex')
const peer = this.peers.get(id)
this.error(error, peer)
})
this.rlpx.on('peer:error', (rlpxPeer: any, error: Error) => {
const peerId = rlpxPeer && rlpxPeer.getId()
if (!peerId) {
return this.error(error)
}
const id = peerId.toString('hex')
const peer = this.peers.get(id)
this.error(error, peer)
})

this.rlpx.on('error', (e: Error) => this.error(e))
this.rlpx.on('error', (e: Error) => this.error(e))

this.rlpx.on('listening', () => {
this.emit('listening', {
transport: this.name,
url: this.getRlpxInfo().enode,
this.rlpx.on('listening', () => {
this.emit('listening', {
transport: this.name,
url: this.getRlpxInfo().enode,
})
resolve()
})
})

if (this.config.port) {
this.rlpx.listen(this.config.port, '0.0.0.0')
}
if (this.config.port) {
this.rlpx.listen(this.config.port, '0.0.0.0')
}
})
}
}
6 changes: 6 additions & 0 deletions packages/client/lib/net/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ export class Server extends EventEmitter {
return true
}

/**
* Server bootstrap.
* In Libp2p this is done during server start.
*/
async bootstrap(): Promise<void> {}

/**
* Stop server. Returns a promise that resolves once server has been stopped.
*/
Expand Down
1 change: 0 additions & 1 deletion packages/client/lib/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ export class Service extends events.EventEmitter {
if (this.running) {
return false
}
await Promise.all(this.config.servers.map((s) => s.start()))
this.running = true
this.config.logger.info(`Started ${this.name} service.`)
}
Expand Down
3 changes: 3 additions & 0 deletions packages/client/test/client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ tape('[EthereumClient]', async (t) => {
open() {}
start() {}
stop() {}
bootstrap() {}
}
Server.prototype.open = td.func<any>()
Server.prototype.start = td.func<any>()
Server.prototype.stop = td.func<any>()
Server.prototype.bootstrap = td.func<any>()
td.replace('../lib/net/server/server', { Server })
td.when(Server.prototype.start()).thenResolve()
td.when(Server.prototype.stop()).thenResolve()
td.when(Server.prototype.bootstrap()).thenResolve()

const { default: EthereumClient } = await import('../lib/client')

Expand Down
1 change: 1 addition & 0 deletions packages/client/test/integration/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export async function setup(
service.synchronizer.execution.syncing = false
}
await service.start()
await server.start()

return [server, service]
}
Expand Down
13 changes: 9 additions & 4 deletions packages/client/test/net/server/rlpxserver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ tape('[RlpxServer]', async (t) => {
server.dpt = td.object<typeof server['dpt']>()
td.when(server.dpt!.getDnsPeers()).thenResolve([dnsPeerInfo])
await server.start()
await server.bootstrap()
td.verify(server.dpt!.bootstrap(dnsPeerInfo))
await server.stop()
t.end()
Expand Down Expand Up @@ -184,14 +185,16 @@ tape('[RlpxServer]', async (t) => {
;(server.dpt as any).emit('error', 'err0')
})

t.test('should init rlpx', (t) => {
t.test('should init rlpx', async (t) => {
t.plan(4)
const config = new Config({ loglevel: 'error', transports: [] })
const server = new RlpxServer({ config })
const rlpxPeer = new RlpxPeer()
td.when(rlpxPeer.getId()).thenReturn(Buffer.from([1]))
td.when(RlpxPeer.prototype.accept(rlpxPeer, td.matchers.isA(RlpxServer))).thenResolve()
server.initRlpx()
server.initRlpx().catch((error) => {
throw error
})
td.verify(RlpxPeer.capabilities(Array.from((server as any).protocols)))
td.verify(server.rlpx!.listen(server.config.port, '0.0.0.0'))
server.on('connected', (peer: any) => t.ok(peer instanceof RlpxPeer, 'connected'))
Expand All @@ -208,14 +211,16 @@ tape('[RlpxServer]', async (t) => {
server.rlpx!.emit('listening')
})

t.test('should handles errors from id-less peers', (t) => {
t.test('should handles errors from id-less peers', async (t) => {
t.plan(1)
const config = new Config({ loglevel: 'error', transports: [] })
const server = new RlpxServer({ config })
const rlpxPeer = new RlpxPeer()
td.when(rlpxPeer.getId()).thenReturn(Buffer.from('test'))
td.when(RlpxPeer.prototype.accept(rlpxPeer, td.matchers.isA(RlpxServer))).thenResolve()
server.initRlpx()
server.initRlpx().catch((error) => {
throw error
})
server.on('error', (err: any) => t.equals(err.message, 'err0', 'got error'))
server.rlpx!.emit('peer:error', rlpxPeer, new Error('err0'))
})
Expand Down
2 changes: 0 additions & 2 deletions packages/client/test/service/fullethereumservice.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ tape('[FullEthereumService]', async (t) => {
t.notOk(await service.start(), 'already started')
await service.stop()
td.verify(service.synchronizer.stop())
td.verify(server.start())
t.notOk(await service.stop(), 'already stopped')
await server.stop()
t.end()
})

Expand Down
1 change: 0 additions & 1 deletion packages/client/test/service/lightethereumservice.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ tape('[LightEthereumService]', async (t) => {
t.notOk(await service.start(), 'already started')
await service.stop()
td.verify(service.synchronizer.stop())
td.verify(server.start())
t.notOk(await service.stop(), 'already stopped')
t.end()
})
Expand Down