Skip to content

Commit

Permalink
fix: allow multiple ping messages (#2711)
Browse files Browse the repository at this point in the history
Read incoming ping messges in 32 byte chunks as per the spec and
echo them back to the sending peer.

If the remove fails to send us 32 bytes, reset the stream with a
`TimeoutError`.
  • Loading branch information
achingbrain authored Sep 22, 2024
1 parent 4fd7eb2 commit c628c44
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 59 deletions.
4 changes: 1 addition & 3 deletions packages/protocol-ping/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,13 @@
"@libp2p/interface": "^2.0.1",
"@libp2p/interface-internal": "^2.0.1",
"@multiformats/multiaddr": "^12.2.3",
"it-first": "^3.0.6",
"it-pipe": "^3.0.1",
"it-byte-stream": "^1.1.0",
"uint8arrays": "^5.1.0"
},
"devDependencies": {
"@libp2p/logger": "^5.0.1",
"@libp2p/peer-id": "^5.0.1",
"aegir": "^44.0.1",
"it-byte-stream": "^1.0.10",
"it-pair": "^2.0.6",
"p-defer": "^4.0.1",
"sinon-ts": "^2.0.0"
Expand Down
69 changes: 24 additions & 45 deletions packages/protocol-ping/src/ping.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { randomBytes } from '@libp2p/crypto'
import { AbortError, InvalidMessageError, ProtocolError, TimeoutError } from '@libp2p/interface'
import first from 'it-first'
import { pipe } from 'it-pipe'
import { ProtocolError, TimeoutError } from '@libp2p/interface'
import { byteStream } from 'it-byte-stream'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { PROTOCOL_PREFIX, PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION, TIMEOUT, MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS } from './constants.js'
import type { PingServiceComponents, PingServiceInit, PingService as PingServiceInterface } from './index.js'
Expand Down Expand Up @@ -60,37 +59,29 @@ export class PingService implements Startable, PingServiceInterface {

const { stream } = data
const start = Date.now()

const signal = AbortSignal.timeout(this.timeout)
signal.addEventListener('abort', () => {
stream?.abort(new TimeoutError('ping timeout'))
const bytes = byteStream(stream)

Promise.resolve().then(async () => {
while (true) {
const signal = AbortSignal.timeout(this.timeout)
signal.addEventListener('abort', () => {
stream?.abort(new TimeoutError('ping timeout'))
})

const buf = await bytes.read(PING_LENGTH, {
signal
})
await bytes.write(buf, {
signal
})
}
})

void pipe(
stream,
async function * (source) {
let received = 0

for await (const buf of source) {
received += buf.byteLength

if (received > PING_LENGTH) {
stream?.abort(new InvalidMessageError('Too much data received'))
return
}

yield buf
}
},
stream
)
.catch(err => {
this.log.error('incoming ping from %p failed with error', data.connection.remotePeer, err)
stream?.abort(err)
})
.finally(() => {
const ms = Date.now() - start

this.log('incoming ping from %p complete in %dms', data.connection.remotePeer, ms)
})
}
Expand All @@ -105,7 +96,6 @@ export class PingService implements Startable, PingServiceInterface {
const data = randomBytes(PING_LENGTH)
const connection = await this.components.connectionManager.openConnection(peer, options)
let stream: Stream | undefined
let onAbort = (): void => {}

if (options.signal == null) {
const signal = AbortSignal.timeout(this.timeout)
Expand All @@ -122,25 +112,15 @@ export class PingService implements Startable, PingServiceInterface {
runOnLimitedConnection: this.runOnLimitedConnection
})

onAbort = () => {
stream?.abort(new AbortError())
}

// make stream abortable
options.signal?.addEventListener('abort', onAbort, { once: true })
const bytes = byteStream(stream)

const result = await pipe(
[data],
stream,
async (source) => first(source)
)
const [, result] = await Promise.all([
bytes.write(data, options),
bytes.read(PING_LENGTH, options)
])

const ms = Date.now() - start

if (result == null) {
throw new ProtocolError(`Did not receive a ping ack after ${ms}ms`)
}

if (!uint8ArrayEquals(data, result.subarray())) {
throw new ProtocolError(`Received wrong ping ack after ${ms}ms`)
}
Expand All @@ -155,9 +135,8 @@ export class PingService implements Startable, PingServiceInterface {

throw err
} finally {
options.signal?.removeEventListener('abort', onAbort)
if (stream != null) {
await stream.close()
await stream.close(options)
}
}
}
Expand Down
29 changes: 18 additions & 11 deletions packages/protocol-ping/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ describe('ping', () => {
logger: defaultLogger()
}

ping = new PingService(components)
ping = new PingService(components, {
timeout: 50
})

await start(ping)
})
Expand Down Expand Up @@ -105,17 +107,20 @@ describe('ping', () => {
connection: stubInterface<Connection>()
})

const input = Uint8Array.from([0, 1, 2, 3, 4])

const b = byteStream(outgoingStream)
void b.write(input)

const input = new Uint8Array(32).fill(1)
void b.write(input)
const output = await b.read()

expect(output).to.equalBytes(input)

const input2 = new Uint8Array(32).fill(2)
void b.write(input2)
const output2 = await b.read()
expect(output2).to.equalBytes(input2)
})

it('should abort stream if too much ping data received', async () => {
it('should abort stream if sending stalls', async () => {
const deferred = pDefer<Error>()

const duplex = duplexPair<any>()
Expand All @@ -135,14 +140,16 @@ describe('ping', () => {
connection: stubInterface<Connection>()
})

const input = new Uint8Array(100)
const b = byteStream(outgoingStream)

void b.read(100)
void b.write(input)
// send a ping message plus a few extra bytes
void b.write(new Uint8Array(35))

const err = await deferred.promise
const pong = await b.read()
expect(pong).to.have.lengthOf(32)

expect(err).to.have.property('name', 'InvalidMessageError')
// never send the remaining 29 bytes (e.g. 64 - 35)
const err = await deferred.promise
expect(err).to.have.property('name', 'TimeoutError')
})
})

0 comments on commit c628c44

Please sign in to comment.