Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/libp2p-daemon-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ export interface PubSubClient {
publish: (topic: string, data: Uint8Array) => Promise<void>
subscribe: (topic: string) => AsyncIterable<PSMessage>
getTopics: () => Promise<string[]>
getSubscribers: (topic: string) => Promise<PeerId[]>
}

export interface DaemonClient {
Expand Down
36 changes: 36 additions & 0 deletions packages/libp2p-daemon-client/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
PSMessage
} from '@libp2p/daemon-protocol'
import type { DaemonClient } from './index.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import { peerIdFromBytes } from '@libp2p/peer-id'

export class Pubsub {
private readonly client: DaemonClient
Expand Down Expand Up @@ -123,4 +125,38 @@ export class Pubsub {
yield PSMessage.decode(message)
}
}

async getSubscribers (topic: string): Promise<PeerId[]> {
if (typeof topic !== 'string') {
throw new CodeError('invalid topic received', 'ERR_INVALID_TOPIC')
}

const sh = await this.client.send({
type: Request.Type.PUBSUB,
pubsub: {
type: PSRequest.Type.LIST_PEERS,
topic
}
})

const message = await sh.read()

if (message == null) {
throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE')
}

const response = Response.decode(message)

await sh.close()

if (response.type !== Response.Type.OK) {
throw new CodeError(response.error?.msg ?? 'Pubsub get subscribers failed', 'ERR_PUBSUB_GET_SUBSCRIBERS_FAILED')
}

if (response.pubsub == null || response.pubsub.topics == null) {
throw new CodeError('Invalid response', 'ERR_PUBSUB_GET_SUBSCRIBERS_FAILED')
}

return response.pubsub.peerIDs.map(buf => peerIdFromBytes(buf))
}
}
30 changes: 30 additions & 0 deletions packages/libp2p-daemon-client/test/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { createClient, DaemonClient } from '../src/index.js'
import { multiaddr } from '@multiformats/multiaddr'
import { StubbedInstance, stubInterface } from 'sinon-ts'
import type { PubSub } from '@libp2p/interface-pubsub'
import { peerIdFromString } from '@libp2p/peer-id'

const defaultMultiaddr = multiaddr('/ip4/0.0.0.0/tcp/12345')

Expand Down Expand Up @@ -88,4 +89,33 @@ describe('daemon pubsub client', function () {
await expect(client.pubsub.publish(topic, data)).to.eventually.be.rejectedWith(/Urk!/)
})
})

describe('getSubscribers', () => {
it('should get empty list of topics when no subscriptions exist', async () => {
pubsub.getSubscribers.returns([])

const topic = 'test-topic'
const topics = await client.pubsub.getSubscribers(topic)

expect(topics).to.have.lengthOf(0)
})

it('should get a list with a peer when subscribed', async () => {
const topic = 'test-topic'
const peer = peerIdFromString('12D3KooWKnQbfH5t1XxJW5FBoMGNjmC9LTSbDdRJxtYj2bJV5XfP')
pubsub.getSubscribers.withArgs(topic).returns([peer])

const peers = await client.pubsub.getSubscribers(topic)

expect(peers).to.have.lengthOf(1)
expect(peers[0].toString()).to.equal(peer.toString())
})

it('should error if receive an error message', async () => {
const topic = 'test-topic'
pubsub.getSubscribers.throws(new Error('Urk!'))

await expect(client.pubsub.getSubscribers(topic)).to.eventually.be.rejectedWith(/Urk!/)
})
})
})
7 changes: 7 additions & 0 deletions packages/libp2p-daemon-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,13 @@ export class Server implements Libp2pServer {

yield * this.pubsubOperations.publish(request.topic, request.data)
return
case PSRequest.Type.LIST_PEERS:
if (request.topic == null) {
throw new Error('Invalid request')
}

yield * this.pubsubOperations.listPeers(request.topic)
return
default:
throw new Error('ERR_INVALID_REQUEST_TYPE')
}
Expand Down
14 changes: 14 additions & 0 deletions packages/libp2p-daemon-server/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,18 @@ export class PubSubOperations {
yield ErrorResponse(err)
}
}

async * listPeers (topic: string): AsyncGenerator<Uint8Array, void, undefined> {
try {
yield OkResponse({
pubsub: {
topics: [topic],
peerIDs: this.pubsub.getSubscribers(topic).map(peer => peer.toBytes())
}
})
} catch (err: any) {
log.error(err)
yield ErrorResponse(err)
}
}
}