diff --git a/packages/libp2p-daemon-client/package.json b/packages/libp2p-daemon-client/package.json index f4080cdb..da05d71b 100644 --- a/packages/libp2p-daemon-client/package.json +++ b/packages/libp2p-daemon-client/package.json @@ -143,7 +143,8 @@ "@multiformats/multiaddr": "^10.1.8", "err-code": "^3.0.1", "it-stream-types": "^1.0.4", - "multiformats": "^9.6.4" + "multiformats": "^9.6.4", + "uint8arraylist": "^2.3.2" }, "devDependencies": { "@libp2p/components": "^2.0.0", diff --git a/packages/libp2p-daemon-client/src/index.ts b/packages/libp2p-daemon-client/src/index.ts index 2719cef9..8d95b1e0 100644 --- a/packages/libp2p-daemon-client/src/index.ts +++ b/packages/libp2p-daemon-client/src/index.ts @@ -1,6 +1,6 @@ import errcode from 'err-code' import { TCP } from '@libp2p/tcp' -import { PSMessage, Request, Response } from '@libp2p/daemon-protocol' +import { PSMessage, Request, Response, StreamInfo } from '@libp2p/daemon-protocol' import { StreamHandler } from '@libp2p/daemon-protocol/stream-handler' import { Multiaddr } from '@multiformats/multiaddr' import { DHT } from './dht.js' @@ -12,6 +12,10 @@ import type { Duplex } from 'it-stream-types' import type { CID } from 'multiformats/cid' import type { PeerInfo } from '@libp2p/interface-peer-info' import type { MultiaddrConnection } from '@libp2p/interface-connection' +import type { Uint8ArrayList } from 'uint8arraylist' +import { logger } from '@libp2p/logger' + +const log = logger('libp2p:daemon-client') class Client implements DaemonClient { private readonly multiaddr: Multiaddr @@ -22,7 +26,6 @@ class Client implements DaemonClient { constructor (addr: Multiaddr) { this.multiaddr = addr this.tcp = new TCP() - this.dht = new DHT(this) this.pubsub = new Pubsub(this) } @@ -150,7 +153,7 @@ class Client implements DaemonClient { /** * Initiate an outbound stream to a peer on one of a set of protocols. */ - async openStream (peerId: PeerId, protocol: string): Promise> { + async openStream (peerId: PeerId, protocol: string): Promise> { if (!isPeerId(peerId)) { throw errcode(new Error('invalid peer id received'), 'ERR_INVALID_PEER_ID') } @@ -181,20 +184,58 @@ class Client implements DaemonClient { /** * Register a handler for inbound streams on a given protocol */ - async registerStreamHandler (addr: Multiaddr, protocol: string) { - if (!Multiaddr.isMultiaddr(addr)) { - throw errcode(new Error('invalid multiaddr received'), 'ERR_INVALID_MULTIADDR') - } - + async registerStreamHandler (protocol: string, handler: StreamHandlerFunction): Promise { if (typeof protocol !== 'string') { throw errcode(new Error('invalid protocol received'), 'ERR_INVALID_PROTOCOL') } + // open a tcp port, pipe any data from it to the handler function + const listener = this.tcp.createListener({ + upgrader: passThroughUpgrader, + handler: (connection) => { + Promise.resolve() + .then(async () => { + const sh = new StreamHandler({ + // @ts-expect-error because we are using a passthrough upgrader, this is a MultiaddrConnection + stream: connection + }) + const message = await sh.read() + + if (message == null) { + throw errcode(new Error('Could not read open stream response'), 'ERR_OPEN_STREAM_FAILED') + } + + const response = StreamInfo.decode(message) + + if (response.proto !== protocol) { + throw errcode(new Error('Incorrect protocol'), 'ERR_OPEN_STREAM_FAILED') + } + + await handler(sh.rest()) + }) + .finally(() => { + connection.close() + .catch(err => { + log.error(err) + }) + listener.close() + .catch(err => { + log.error(err) + }) + }) + } + }) + await listener.listen(new Multiaddr('/ip4/127.0.0.1/tcp/0')) + const address = listener.getAddrs()[0] + + if (address == null) { + throw errcode(new Error('Could not listen on port'), 'ERR_REGISTER_STREAM_HANDLER_FAILED') + } + const sh = await this.send({ type: Request.Type.STREAM_HANDLER, - streamOpen: undefined, streamHandler: { - addr: addr.bytes, + addr: address.bytes, proto: [protocol] } }) @@ -215,6 +256,10 @@ export interface IdentifyResult { addrs: Multiaddr[] } +export interface StreamHandlerFunction { + (stream: Duplex): Promise +} + export interface DHTClient { put: (key: Uint8Array, value: Uint8Array) => Promise get: (key: Uint8Array) => Promise @@ -238,7 +283,8 @@ export interface DaemonClient { pubsub: PubSubClient send: (request: Request) => Promise - openStream: (peerId: PeerId, protocol: string) => Promise> + openStream: (peerId: PeerId, protocol: string) => Promise> + registerStreamHandler: (protocol: string, handler: StreamHandlerFunction) => Promise } export function createClient (multiaddr: Multiaddr): DaemonClient { diff --git a/packages/libp2p-daemon-client/test/stream.spec.ts b/packages/libp2p-daemon-client/test/stream.spec.ts index 0622f5b6..fcdfbd90 100644 --- a/packages/libp2p-daemon-client/test/stream.spec.ts +++ b/packages/libp2p-daemon-client/test/stream.spec.ts @@ -86,6 +86,6 @@ describe('daemon stream client', function () { ) expect(data).to.have.lengthOf(1) - expect(uint8ArrayToString(data[0])).to.equal('hello world') + expect(uint8ArrayToString(data[0].subarray())).to.equal('hello world') }) }) diff --git a/packages/libp2p-daemon-protocol/src/stream-handler.ts b/packages/libp2p-daemon-protocol/src/stream-handler.ts index 46d73c2d..9f7c586a 100644 --- a/packages/libp2p-daemon-protocol/src/stream-handler.ts +++ b/packages/libp2p-daemon-protocol/src/stream-handler.ts @@ -14,7 +14,7 @@ export interface StreamHandlerOptions { export class StreamHandler { private readonly stream: Duplex - private readonly shake: Handshake + private readonly shake: Handshake public decoder: Source /** * Create a stream handler for connection @@ -34,7 +34,7 @@ export class StreamHandler { // @ts-expect-error decoder is really a generator const msg = await this.decoder.next() if (msg.value != null) { - return msg.value.slice() + return msg.value.subarray() } log('read received no value, closing stream') diff --git a/packages/libp2p-daemon-server/src/index.ts b/packages/libp2p-daemon-server/src/index.ts index 7c3e78ee..5e91ec28 100644 --- a/packages/libp2p-daemon-server/src/index.ts +++ b/packages/libp2p-daemon-server/src/index.ts @@ -15,7 +15,7 @@ import { StreamInfo } from '@libp2p/daemon-protocol' import type { Listener } from '@libp2p/interface-transport' -import type { Connection, Stream } from '@libp2p/interface-connection' +import type { Connection, MultiaddrConnection, Stream } from '@libp2p/interface-connection' import type { PeerId } from '@libp2p/interface-peer-id' import type { AbortOptions } from '@libp2p/interfaces' import type { StreamHandler as StreamCallback } from '@libp2p/interface-registrar' @@ -67,7 +67,6 @@ export class Server implements Libp2pServer { private readonly libp2p: Libp2p private readonly tcp: TCP private readonly listener: Listener - private readonly streamHandlers: Record private readonly dhtOperations?: DHTOperations private readonly pubsubOperations?: PubSubOperations @@ -81,7 +80,6 @@ export class Server implements Libp2pServer { handler: this.handleConnection.bind(this), upgrader: passThroughUpgrader }) - this.streamHandlers = {} this._onExit = this._onExit.bind(this) if (libp2pNode.dht != null) { @@ -118,9 +116,7 @@ export class Server implements Libp2pServer { } const { peer, proto } = request.streamOpen - const peerId = peerIdFromBytes(peer) - const connection = await this.libp2p.dial(peerId) const stream = await connection.newStream(proto) @@ -146,18 +142,18 @@ export class Server implements Libp2pServer { const protocols = request.streamHandler.proto const addr = new Multiaddr(request.streamHandler.addr) - const addrString = addr.toString() - - // If we have a handler, end it - if (this.streamHandlers[addrString] != null) { - await this.streamHandlers[addrString].close() - delete this.streamHandlers[addrString] // eslint-disable-line @typescript-eslint/no-dynamic-delete - } + let conn: MultiaddrConnection + + await this.libp2p.handle(protocols, ({ connection, stream }) => { + Promise.resolve() + .then(async () => { + // Connect the client socket with the libp2p connection + // @ts-expect-error because we use a passthrough upgrader, + // this is actually a MultiaddrConnection and not a Connection + conn = await this.tcp.dial(addr, { + upgrader: passThroughUpgrader + }) - await Promise.all( - protocols.map(async (proto) => { - // Connect the client socket with the libp2p connection - await this.libp2p.handle(proto, ({ connection, stream }) => { const message = StreamInfo.encode({ peer: connection.remotePeer.toBytes(), addr: connection.remoteAddr.bytes, @@ -167,21 +163,36 @@ export class Server implements Libp2pServer { // Tell the client about the new connection // And then begin piping the client and peer connection - void pipe( - [encodedMessage, stream.source], - // @ts-expect-error because we use a passthrough upgrader, - // this is actually a MultiaddrConnection and not a Connection - clientConnection, + await pipe( + (async function * () { + yield encodedMessage + yield * stream.source + }()), + async function * (source) { + for await (const list of source) { + // convert Uint8ArrayList to Uint8Arrays for the socket + yield * list + } + }, + conn, stream.sink - ).catch(err => { - log.error(err) - }) + ) }) - }) - ) + .catch(async err => { + log.error(err) - const clientConnection = await this.tcp.dial(addr, { - upgrader: passThroughUpgrader + if (conn != null) { + await conn.close(err) + } + }) + .finally(() => { + if (conn != null) { + conn.close() + .catch(err => { + log.error(err) + }) + } + }) }) }