Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/libp2p-daemon-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@
"@multiformats/multiaddr": "^10.1.8",
"err-code": "^3.0.1",
"it-stream-types": "^1.0.4",
"multiformats": "^9.6.4"
"multiformats": "^9.6.4",
"uint8arraylist": "^2.3.2"
},
"devDependencies": {
"@libp2p/components": "^2.0.0",
Expand Down
68 changes: 57 additions & 11 deletions packages/libp2p-daemon-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import errcode from 'err-code'
import { TCP } from '@libp2p/tcp'
import { PSMessage, Request, Response } from '@libp2p/daemon-protocol'
import { PSMessage, Request, Response, StreamInfo } from '@libp2p/daemon-protocol'
import { StreamHandler } from '@libp2p/daemon-protocol/stream-handler'
import { Multiaddr } from '@multiformats/multiaddr'
import { DHT } from './dht.js'
Expand All @@ -12,6 +12,10 @@ import type { Duplex } from 'it-stream-types'
import type { CID } from 'multiformats/cid'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { MultiaddrConnection } from '@libp2p/interface-connection'
import type { Uint8ArrayList } from 'uint8arraylist'
import { logger } from '@libp2p/logger'

const log = logger('libp2p:daemon-client')

class Client implements DaemonClient {
private readonly multiaddr: Multiaddr
Expand All @@ -22,7 +26,6 @@ class Client implements DaemonClient {
constructor (addr: Multiaddr) {
this.multiaddr = addr
this.tcp = new TCP()

this.dht = new DHT(this)
this.pubsub = new Pubsub(this)
}
Expand Down Expand Up @@ -150,7 +153,7 @@ class Client implements DaemonClient {
/**
* Initiate an outbound stream to a peer on one of a set of protocols.
*/
async openStream (peerId: PeerId, protocol: string): Promise<Duplex<Uint8Array>> {
async openStream (peerId: PeerId, protocol: string): Promise<Duplex<Uint8ArrayList, Uint8Array>> {
if (!isPeerId(peerId)) {
throw errcode(new Error('invalid peer id received'), 'ERR_INVALID_PEER_ID')
}
Expand Down Expand Up @@ -181,20 +184,58 @@ class Client implements DaemonClient {
/**
* Register a handler for inbound streams on a given protocol
*/
async registerStreamHandler (addr: Multiaddr, protocol: string) {
if (!Multiaddr.isMultiaddr(addr)) {
throw errcode(new Error('invalid multiaddr received'), 'ERR_INVALID_MULTIADDR')
}

async registerStreamHandler (protocol: string, handler: StreamHandlerFunction): Promise<void> {
if (typeof protocol !== 'string') {
throw errcode(new Error('invalid protocol received'), 'ERR_INVALID_PROTOCOL')
}

// open a tcp port, pipe any data from it to the handler function
const listener = this.tcp.createListener({
upgrader: passThroughUpgrader,
handler: (connection) => {
Promise.resolve()
.then(async () => {
const sh = new StreamHandler({
// @ts-expect-error because we are using a passthrough upgrader, this is a MultiaddrConnection
stream: connection
})
const message = await sh.read()

if (message == null) {
throw errcode(new Error('Could not read open stream response'), 'ERR_OPEN_STREAM_FAILED')
}

const response = StreamInfo.decode(message)

if (response.proto !== protocol) {
throw errcode(new Error('Incorrect protocol'), 'ERR_OPEN_STREAM_FAILED')
}

await handler(sh.rest())
})
.finally(() => {
connection.close()
.catch(err => {
log.error(err)
})
listener.close()
.catch(err => {
log.error(err)
})
})
}
})
await listener.listen(new Multiaddr('/ip4/127.0.0.1/tcp/0'))
const address = listener.getAddrs()[0]

if (address == null) {
throw errcode(new Error('Could not listen on port'), 'ERR_REGISTER_STREAM_HANDLER_FAILED')
}

const sh = await this.send({
type: Request.Type.STREAM_HANDLER,
streamOpen: undefined,
streamHandler: {
addr: addr.bytes,
addr: address.bytes,
proto: [protocol]
}
})
Expand All @@ -215,6 +256,10 @@ export interface IdentifyResult {
addrs: Multiaddr[]
}

export interface StreamHandlerFunction {
(stream: Duplex<Uint8ArrayList, Uint8Array>): Promise<void>
}

export interface DHTClient {
put: (key: Uint8Array, value: Uint8Array) => Promise<void>
get: (key: Uint8Array) => Promise<Uint8Array>
Expand All @@ -238,7 +283,8 @@ export interface DaemonClient {
pubsub: PubSubClient

send: (request: Request) => Promise<StreamHandler>
openStream: (peerId: PeerId, protocol: string) => Promise<Duplex<Uint8Array>>
openStream: (peerId: PeerId, protocol: string) => Promise<Duplex<Uint8ArrayList, Uint8Array>>
registerStreamHandler: (protocol: string, handler: StreamHandlerFunction) => Promise<void>
}

export function createClient (multiaddr: Multiaddr): DaemonClient {
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p-daemon-client/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ describe('daemon stream client', function () {
)

expect(data).to.have.lengthOf(1)
expect(uint8ArrayToString(data[0])).to.equal('hello world')
expect(uint8ArrayToString(data[0].subarray())).to.equal('hello world')
})
})
4 changes: 2 additions & 2 deletions packages/libp2p-daemon-protocol/src/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface StreamHandlerOptions {

export class StreamHandler {
private readonly stream: Duplex<Uint8Array>
private readonly shake: Handshake
private readonly shake: Handshake<Uint8Array>
public decoder: Source<Uint8ArrayList>
/**
* Create a stream handler for connection
Expand All @@ -34,7 +34,7 @@ export class StreamHandler {
// @ts-expect-error decoder is really a generator
const msg = await this.decoder.next()
if (msg.value != null) {
return msg.value.slice()
return msg.value.subarray()
}

log('read received no value, closing stream')
Expand Down
67 changes: 39 additions & 28 deletions packages/libp2p-daemon-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
StreamInfo
} from '@libp2p/daemon-protocol'
import type { Listener } from '@libp2p/interface-transport'
import type { Connection, Stream } from '@libp2p/interface-connection'
import type { Connection, MultiaddrConnection, Stream } from '@libp2p/interface-connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { AbortOptions } from '@libp2p/interfaces'
import type { StreamHandler as StreamCallback } from '@libp2p/interface-registrar'
Expand Down Expand Up @@ -67,7 +67,6 @@ export class Server implements Libp2pServer {
private readonly libp2p: Libp2p
private readonly tcp: TCP
private readonly listener: Listener
private readonly streamHandlers: Record<string, StreamHandler>
private readonly dhtOperations?: DHTOperations
private readonly pubsubOperations?: PubSubOperations

Expand All @@ -81,7 +80,6 @@ export class Server implements Libp2pServer {
handler: this.handleConnection.bind(this),
upgrader: passThroughUpgrader
})
this.streamHandlers = {}
this._onExit = this._onExit.bind(this)

if (libp2pNode.dht != null) {
Expand Down Expand Up @@ -118,9 +116,7 @@ export class Server implements Libp2pServer {
}

const { peer, proto } = request.streamOpen

const peerId = peerIdFromBytes(peer)

const connection = await this.libp2p.dial(peerId)
const stream = await connection.newStream(proto)

Expand All @@ -146,18 +142,18 @@ export class Server implements Libp2pServer {

const protocols = request.streamHandler.proto
const addr = new Multiaddr(request.streamHandler.addr)
const addrString = addr.toString()

// If we have a handler, end it
if (this.streamHandlers[addrString] != null) {
await this.streamHandlers[addrString].close()
delete this.streamHandlers[addrString] // eslint-disable-line @typescript-eslint/no-dynamic-delete
}
let conn: MultiaddrConnection

await this.libp2p.handle(protocols, ({ connection, stream }) => {
Promise.resolve()
.then(async () => {
// Connect the client socket with the libp2p connection
// @ts-expect-error because we use a passthrough upgrader,
// this is actually a MultiaddrConnection and not a Connection
conn = await this.tcp.dial(addr, {
upgrader: passThroughUpgrader
})

await Promise.all(
protocols.map(async (proto) => {
// Connect the client socket with the libp2p connection
await this.libp2p.handle(proto, ({ connection, stream }) => {
const message = StreamInfo.encode({
peer: connection.remotePeer.toBytes(),
addr: connection.remoteAddr.bytes,
Expand All @@ -167,21 +163,36 @@ export class Server implements Libp2pServer {

// Tell the client about the new connection
// And then begin piping the client and peer connection
void pipe(
[encodedMessage, stream.source],
// @ts-expect-error because we use a passthrough upgrader,
// this is actually a MultiaddrConnection and not a Connection
clientConnection,
await pipe(
(async function * () {
yield encodedMessage
yield * stream.source
}()),
async function * (source) {
for await (const list of source) {
// convert Uint8ArrayList to Uint8Arrays for the socket
yield * list
}
},
conn,
stream.sink
).catch(err => {
log.error(err)
})
)
})
})
)
.catch(async err => {
log.error(err)

const clientConnection = await this.tcp.dial(addr, {
upgrader: passThroughUpgrader
if (conn != null) {
await conn.close(err)
}
})
.finally(() => {
if (conn != null) {
conn.close()
.catch(err => {
log.error(err)
})
}
})
})
}

Expand Down