From 2f24750168fd1ddc1afd4f90435b16f3532d432e Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 23 Feb 2023 10:48:44 +0000 Subject: [PATCH] feat: add get subscribers for pubsub topics With this implemented we can remove some of the arbitrary delays in the interop tests, instead we can wait until we see the remote node in the topic peer list. --- packages/libp2p-daemon-client/src/index.ts | 1 + packages/libp2p-daemon-client/src/pubsub.ts | 36 +++++++++++++++++++ .../libp2p-daemon-client/test/pubsub.spec.ts | 30 ++++++++++++++++ packages/libp2p-daemon-server/src/index.ts | 7 ++++ packages/libp2p-daemon-server/src/pubsub.ts | 14 ++++++++ 5 files changed, 88 insertions(+) diff --git a/packages/libp2p-daemon-client/src/index.ts b/packages/libp2p-daemon-client/src/index.ts index 79298df3..ef56b204 100644 --- a/packages/libp2p-daemon-client/src/index.ts +++ b/packages/libp2p-daemon-client/src/index.ts @@ -295,6 +295,7 @@ export interface PubSubClient { publish: (topic: string, data: Uint8Array) => Promise subscribe: (topic: string) => AsyncIterable getTopics: () => Promise + getSubscribers: (topic: string) => Promise } export interface DaemonClient { diff --git a/packages/libp2p-daemon-client/src/pubsub.ts b/packages/libp2p-daemon-client/src/pubsub.ts index 69ff71b2..2133741e 100644 --- a/packages/libp2p-daemon-client/src/pubsub.ts +++ b/packages/libp2p-daemon-client/src/pubsub.ts @@ -6,6 +6,8 @@ import { PSMessage } from '@libp2p/daemon-protocol' import type { DaemonClient } from './index.js' +import type { PeerId } from '@libp2p/interface-peer-id' +import { peerIdFromBytes } from '@libp2p/peer-id' export class Pubsub { private readonly client: DaemonClient @@ -123,4 +125,38 @@ export class Pubsub { yield PSMessage.decode(message) } } + + async getSubscribers (topic: string): Promise { + if (typeof topic !== 'string') { + throw new CodeError('invalid topic received', 'ERR_INVALID_TOPIC') + } + + const sh = await this.client.send({ + type: Request.Type.PUBSUB, + pubsub: { + type: PSRequest.Type.LIST_PEERS, + topic + } + }) + + const message = await sh.read() + + if (message == null) { + throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE') + } + + const response = Response.decode(message) + + await sh.close() + + if (response.type !== Response.Type.OK) { + throw new CodeError(response.error?.msg ?? 'Pubsub get subscribers failed', 'ERR_PUBSUB_GET_SUBSCRIBERS_FAILED') + } + + if (response.pubsub == null || response.pubsub.topics == null) { + throw new CodeError('Invalid response', 'ERR_PUBSUB_GET_SUBSCRIBERS_FAILED') + } + + return response.pubsub.peerIDs.map(buf => peerIdFromBytes(buf)) + } } diff --git a/packages/libp2p-daemon-client/test/pubsub.spec.ts b/packages/libp2p-daemon-client/test/pubsub.spec.ts index 07cb1087..b21e6af9 100644 --- a/packages/libp2p-daemon-client/test/pubsub.spec.ts +++ b/packages/libp2p-daemon-client/test/pubsub.spec.ts @@ -8,6 +8,7 @@ import { createClient, DaemonClient } from '../src/index.js' import { multiaddr } from '@multiformats/multiaddr' import { StubbedInstance, stubInterface } from 'sinon-ts' import type { PubSub } from '@libp2p/interface-pubsub' +import { peerIdFromString } from '@libp2p/peer-id' const defaultMultiaddr = multiaddr('/ip4/0.0.0.0/tcp/12345') @@ -88,4 +89,33 @@ describe('daemon pubsub client', function () { await expect(client.pubsub.publish(topic, data)).to.eventually.be.rejectedWith(/Urk!/) }) }) + + describe('getSubscribers', () => { + it('should get empty list of topics when no subscriptions exist', async () => { + pubsub.getSubscribers.returns([]) + + const topic = 'test-topic' + const topics = await client.pubsub.getSubscribers(topic) + + expect(topics).to.have.lengthOf(0) + }) + + it('should get a list with a peer when subscribed', async () => { + const topic = 'test-topic' + const peer = peerIdFromString('12D3KooWKnQbfH5t1XxJW5FBoMGNjmC9LTSbDdRJxtYj2bJV5XfP') + pubsub.getSubscribers.withArgs(topic).returns([peer]) + + const peers = await client.pubsub.getSubscribers(topic) + + expect(peers).to.have.lengthOf(1) + expect(peers[0].toString()).to.equal(peer.toString()) + }) + + it('should error if receive an error message', async () => { + const topic = 'test-topic' + pubsub.getSubscribers.throws(new Error('Urk!')) + + await expect(client.pubsub.getSubscribers(topic)).to.eventually.be.rejectedWith(/Urk!/) + }) + }) }) diff --git a/packages/libp2p-daemon-server/src/index.ts b/packages/libp2p-daemon-server/src/index.ts index 772e953f..c9810b25 100644 --- a/packages/libp2p-daemon-server/src/index.ts +++ b/packages/libp2p-daemon-server/src/index.ts @@ -297,6 +297,13 @@ export class Server implements Libp2pServer { yield * this.pubsubOperations.publish(request.topic, request.data) return + case PSRequest.Type.LIST_PEERS: + if (request.topic == null) { + throw new Error('Invalid request') + } + + yield * this.pubsubOperations.listPeers(request.topic) + return default: throw new Error('ERR_INVALID_REQUEST_TYPE') } diff --git a/packages/libp2p-daemon-server/src/pubsub.ts b/packages/libp2p-daemon-server/src/pubsub.ts index d9e53423..6c24ea07 100644 --- a/packages/libp2p-daemon-server/src/pubsub.ts +++ b/packages/libp2p-daemon-server/src/pubsub.ts @@ -84,4 +84,18 @@ export class PubSubOperations { yield ErrorResponse(err) } } + + async * listPeers (topic: string): AsyncGenerator { + try { + yield OkResponse({ + pubsub: { + topics: [topic], + peerIDs: this.pubsub.getSubscribers(topic).map(peer => peer.toBytes()) + } + }) + } catch (err: any) { + log.error(err) + yield ErrorResponse(err) + } + } }