Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/typecheck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
on:
push:
branches:
- master
- main
- default
pull_request:
branches:
- '**'

name: Typecheck
jobs:
check:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [12.x]
steps:
- uses: actions/checkout@v1
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Install dependencies
run: npm install
- name: Typecheck
uses: gozala/typescript-error-reporter-action@v1.0.8
12 changes: 12 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@
"types",
"dist"
],
"types": "dist/src/index.d.ts",
"typesVersions": {
"*": {
"src/*": [
"dist/src/*",
"dist/src/*/index"
]
}
},
"eslintConfig": {
"extends": "ipfs"
},
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
Expand Down
36 changes: 26 additions & 10 deletions src/connection/connection.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
'use strict'
/* eslint-disable valid-jsdoc */

const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const errCode = require('err-code')
const Status = require('./status')
const { OPEN, CLOSING, CLOSED } = require('./status')

const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')

Expand Down Expand Up @@ -51,7 +50,7 @@ class Connection {
/**
* Connection identifier.
*/
this.id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
this.id = (parseInt(String(Math.random() * 1e9))).toString(36) + Date.now()

/**
* Observed multiaddr of the local peer
Expand All @@ -75,10 +74,12 @@ class Connection {

/**
* Connection metadata.
*
* @type {Stat & {status: Status}}
*/
this._stat = {
...stat,
status: Status.OPEN
status: OPEN
}

/**
Expand Down Expand Up @@ -152,11 +153,11 @@ class Connection {
* @returns {Promise<{stream: MuxedStream, protocol: string}>} with muxed+multistream-selected stream and selected protocol
*/
async newStream (protocols) {
if (this.stat.status === Status.CLOSING) {
if (this.stat.status === CLOSING) {
throw errCode(new Error('the connection is being closed'), 'ERR_CONNECTION_BEING_CLOSED')
}

if (this.stat.status === Status.CLOSED) {
if (this.stat.status === CLOSED) {
throw errCode(new Error('the connection is closed'), 'ERR_CONNECTION_CLOSED')
}

Expand All @@ -178,7 +179,7 @@ class Connection {
* @param {MuxedStream} muxedStream - a muxed stream
* @param {object} properties - the stream properties to be registered
* @param {string} properties.protocol - the protocol used by the stream
* @param {object} properties.metadata - metadata of the stream
* @param {object} [properties.metadata] - metadata of the stream
* @returns {void}
*/
addStream (muxedStream, { protocol, metadata = {} }) {
Expand All @@ -204,24 +205,39 @@ class Connection {
* @returns {Promise<void>}
*/
async close () {
if (this.stat.status === Status.CLOSED) {
if (this.stat.status === CLOSED) {
return
}

if (this._closing) {
return this._closing
}

this.stat.status = Status.CLOSING
this.stat.status = CLOSING

// Close raw connection
this._closing = await this._close()

this._stat.timeline.close = Date.now()
this.stat.status = Status.CLOSED
this.stat.status = CLOSED
}
}

/**
* @typedef {Object} Stat
* @property {string} direction - connection establishment direction ("inbound" or "outbound").
* @property {Timeline} timeline - connection relevant events timestamp.
* @property {string} [multiplexer] - connection multiplexing identifier.
* @property {string} [encryption] - connection encryption method identifier.
*
* @typedef {Object} Timeline
* @property {number} open - connection opening timestamp.
* @property {number} [upgraded] - connection upgraded timestamp.
* @property {number} [close]
*
* @typedef {import('./status').Status} Status
*/

module.exports = Connection

function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {
Expand Down
4 changes: 0 additions & 4 deletions src/connection/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
'use strict'

/**
* @module connection/index
* @type {typeof import('./connection')}
*/
exports.Connection = require('./connection')
7 changes: 6 additions & 1 deletion src/connection/status.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
'use strict'

module.exports = {
const STATUS = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be causing issues integrating with libp2p PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Property 'OPEN' does not exist on type 'typeof import("/Users/vsantos/work/pl/gh/libp2p/js-libp2p-interfaces/dist/src/connection/status")'"

Using:

const ConnectionStatus = require('libp2p-interfaces/src/connection/status')

ConnectionStatus.OPEN

OPEN: /** @type {'open'} */('open'),
CLOSING: /** @type {'closing'} */('closing'),
CLOSED: /** @type {'closed'} */('closed')
}
module.exports = STATUS

/**
* @typedef {STATUS[keyof STATUS]} Status
*/
3 changes: 3 additions & 0 deletions src/connection/tests/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ module.exports = (test) => {
let timelineProxy
const proxyHandler = {
set () {
// @ts-ignore - TS fails to infer here
return Reflect.set(...arguments)
}
}
Expand Down Expand Up @@ -138,7 +139,9 @@ module.exports = (test) => {
expect(connection.stat.timeline.close).to.not.exist()

await connection.close()
// @ts-ignore - fails to infer callCount
expect(proxyHandler.set.callCount).to.equal(1)
// @ts-ignore - fails to infer getCall
const [obj, key, value] = proxyHandler.set.getCall(0).args
expect(obj).to.eql(connection.stat.timeline)
expect(key).to.equal('close')
Expand Down
2 changes: 1 addition & 1 deletion src/crypto/tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const expect = chai.expect
chai.use(require('dirty-chai'))

const duplexPair = require('it-pair/duplex')
const pipe = require('it-pipe')
const { pipe } = require('it-pipe')
const PeerId = require('peer-id')
const { collect } = require('streaming-iterables')
const uint8arrayFromString = require('uint8arrays/from-string')
Expand Down
30 changes: 21 additions & 9 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
'use strict'
/* eslint-disable valid-jsdoc */

const debug = require('debug')
const EventEmitter = require('events')
const { EventEmitter } = require('events')
const errcode = require('err-code')

const pipe = require('it-pipe')
const { pipe } = require('it-pipe')

const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')
Expand Down Expand Up @@ -46,7 +45,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {string} props.debugName - log namespace
* @param {Array<string>|string} props.multicodecs - protocol identificers to connect
* @param {Libp2p} props.libp2p
* @param {SignaturePolicy} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
* @param {SignaturePolicyType} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
* @param {boolean} [props.canRelayMessage = false] - if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] - if publish should emit to self, if subscribed
* @abstract
Expand Down Expand Up @@ -226,6 +225,7 @@ class PubsubBaseProtocol extends EventEmitter {
const peer = this._addPeer(peerId, protocol)
peer.attachInboundStream(stream)

// @ts-ignore - peer.inboundStream maybe null
this._processMessages(idB58Str, peer.inboundStream, peer)
}

Expand All @@ -243,6 +243,7 @@ class PubsubBaseProtocol extends EventEmitter {
try {
const { stream, protocol } = await conn.newStream(this.multicodecs)
const peer = this._addPeer(peerId, protocol)
// @ts-ignore MuxedStream is not DuplexIterableStream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address this after merging. This should really be using the muxedStream

await peer.attachOutboundStream(stream)
} catch (err) {
this.log.err(err)
Expand All @@ -257,7 +258,7 @@ class PubsubBaseProtocol extends EventEmitter {
*
* @private
* @param {PeerId} peerId - peerId
* @param {Error} err - error for connection end
* @param {Error} [err] - error for connection end
*/
_onPeerDisconnected (peerId, err) {
const idB58Str = peerId.toB58String()
Expand Down Expand Up @@ -341,6 +342,7 @@ class PubsubBaseProtocol extends EventEmitter {
await pipe(
stream,
async (source) => {
// @ts-ignore - DuplexIterableStream isn't defined as iterable
for await (const data of source) {
const rpcBytes = data instanceof Uint8Array ? data : data.slice()
const rpcMsg = this._decodeRpc(rpcBytes)
Expand Down Expand Up @@ -395,7 +397,7 @@ class PubsubBaseProtocol extends EventEmitter {
* Handles a subscription change from a peer
*
* @param {string} id
* @param {RPC.SubOpt} subOpt
* @param {RPCSubOpts} subOpt
*/
_processRpcSubOpt (id, subOpt) {
const t = subOpt.topicID
Expand Down Expand Up @@ -457,7 +459,7 @@ class PubsubBaseProtocol extends EventEmitter {
* The default msgID implementation
* Child class can override this.
*
* @param {RPC.Message} msg - the message object
* @param {RPCMessage} msg - the message object
* @returns {Uint8Array} message id as bytes
*/
getMsgId (msg) {
Expand Down Expand Up @@ -590,8 +592,8 @@ class PubsubBaseProtocol extends EventEmitter {
* Should be used by the routers to create the message to send.
*
* @private
* @param {Message} message
* @returns {Promise<Message>}
* @param {RPCMessage} message
* @returns {Promise<RPCMessage>}
*/
_buildMessage (message) {
const signaturePolicy = this.globalSignaturePolicy
Expand Down Expand Up @@ -728,6 +730,16 @@ class PubsubBaseProtocol extends EventEmitter {
}
}

/**
* @typedef {any} Libp2p
* @typedef {import('./peer-streams').DuplexIterableStream} DuplexIterableStream
* @typedef {import('../connection/connection')} Connection
* @typedef {import('./message').RPC} RPC
* @typedef {import('./message').SubOpts} RPCSubOpts
* @typedef {import('./message').Message} RPCMessage
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
*/

module.exports = PubsubBaseProtocol
module.exports.message = message
module.exports.utils = utils
Expand Down
8 changes: 8 additions & 0 deletions src/pubsub/message/sign.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async function verifySignature (message) {
const baseMessage = { ...message }
delete baseMessage.signature
delete baseMessage.key
// @ts-ignore - from is optional
baseMessage.from = PeerId.createFromCID(baseMessage.from).toBytes()
const bytes = uint8ArrayConcat([
SignPrefix,
Expand All @@ -50,6 +51,7 @@ async function verifySignature (message) {
const pubKey = await messagePublicKey(message)

// verify the base message
// @ts-ignore - may not have signature
return pubKey.verify(bytes, message.signature)
}

Expand All @@ -62,6 +64,7 @@ async function verifySignature (message) {
*/
async function messagePublicKey (message) {
// should be available in the from property of the message (peer id)
// @ts-ignore - from is optional
const from = PeerId.createFromCID(message.from)

if (message.key) {
Expand All @@ -78,6 +81,11 @@ async function messagePublicKey (message) {
}
}

/**
* @typedef {import('..').InMessage} InMessage
* @typedef {import('libp2p-crypto').PublicKey} PublicKey
*/

module.exports = {
messagePublicKey,
signMessage,
Expand Down
Loading