Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/typecheck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
on:
push:
branches:
- master
- main
- default
pull_request:
branches:
- '**'

name: Typecheck
jobs:
check:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [12.x]
steps:
- uses: actions/checkout@v1
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Install dependencies
run: npm install
- name: Typecheck
uses: gozala/[email protected]
29 changes: 23 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,33 @@
"./test/utils/create-libp2p-node": false,
"./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js"
},
"types": "dist/src/index.d.ts",
"typesVersions": {
"*": {
"src/*": [
"dist/src/*",
"dist/src/*/index"
]
}
},
"eslintConfig": {
"extends": "ipfs"
},
"files": [
"dist",
"src"
],
"scripts": {
"prepare": "aegir build",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need full build here

Suggested change
"prepare": "aegir build",
"prepare": "aegir ts -p check",

"test": "aegir test",
"test:browser": "aegir test -t browser -t webworker",
"test:node": "aegir test -t node",
"lint": "aegir lint",
"check": "aegir ts -p check",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
"bench": "node benchmarks/index",
"build": "aegir build",
"coverage": "aegir coverage --provider codecov",
"docs": "aegir docs",
"benchmarks": "node test/benchmarks/get-many"
Expand All @@ -43,11 +56,12 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^28.1.0",
"@types/debug": "^4.1.5",
"aegir": "^29.2.0",
"benchmark": "^2.1.4",
"delay": "^4.3.0",
"ipfs-repo": "^7.0.0",
"ipfs-utils": "^4.0.0",
"ipfs-utils": "^5.0.1",
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.2",
"it-drain": "^1.0.1",
Expand All @@ -69,22 +83,24 @@
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"typescript": "^4.0.5",
"uuid": "^8.0.0"
},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^2.1.1",
"bignumber.js": "^9.0.0",
"cids": "^1.0.0",
"debug": "^4.1.0",
"debug": "^4.2.0",
"ipld-block": "^0.11.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
"libp2p-interfaces": "^0.7.1",
"libp2p-interfaces": "^0.8.1",
"moving-average": "^1.0.0",
"multicodec": "^2.0.0",
"multihashing-async": "^2.0.1",
"native-abort-controller": "0.0.3",
"protons": "^2.0.0",
"streaming-iterables": "^5.0.2",
"uint8arrays": "^1.1.0",
Expand Down Expand Up @@ -115,6 +131,7 @@
"dmitriy ryajov <[email protected]>",
"Dmitriy Ryajov <[email protected]>",
"Bryan Stenson <[email protected]>",
"Richard Schneider <[email protected]>"
"Richard Schneider <[email protected]>",
"Irakli Gozalishvili <[email protected]>"
]
}
120 changes: 104 additions & 16 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ const TARGET_MESSAGE_SIZE = 16 * 1024
const MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024

class DecisionEngine {
/**
*
* @param {PeerId} peerId
* @param {BlockStore} blockstore
* @param {import('../network')} network
* @param {Stats} stats
* @param {Object} [opts]
* @param {number} [opts.targetMessageSize]
* @param {number} [opts.maxSizeReplaceHasWithBlock]
*/
constructor (peerId, blockstore, network, stats, opts) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
Expand All @@ -34,6 +44,7 @@ class DecisionEngine {
this._opts = this._processOpts(opts)

// A list of of ledgers by their partner id
/** @type {Map<string, Ledger>} */
this.ledgerMap = new Map()
this._running = false

Expand Down Expand Up @@ -112,7 +123,7 @@ class DecisionEngine {

// If there's nothing in the message, bail out
if (msg.empty) {
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peerId is obtained on line 76 via following code:

 const { peerId, tasks, pendingSize } = this._requestQueue.popTasks(this._opts.targetMessageSize)

However popTasks returns an object without peerId in 2 code paths out of 3. At the same time tasksDone and messageSent below do not expect undefined.

It was not exactly clear what made most sense here, so I just conditioned those calls on peerIds presence, but it would be good to make sure all this makes sense.


// Trigger the next round of task processing
this._scheduleProcessTasks()
Expand All @@ -122,32 +133,36 @@ class DecisionEngine {

try {
// Send the message
await this.network.sendMessage(peerId, msg)
peerId && await this.network.sendMessage(peerId, msg)

// Peform sent message accounting
for (const block of blocks.values()) {
this.messageSent(peerId, block)
peerId && this.messageSent(peerId, block)
}
} catch (err) {
this._log.error(err)
}

// Free the tasks up from the request queue
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)

// Trigger the next round of task processing
this._scheduleProcessTasks()
}

/**
* @param {PeerId} peerId
* @returns {Map<string, WantListEntry>}
*/
wantlistForPeer (peerId) {
const peerIdStr = peerId.toB58String()
if (!this.ledgerMap.has(peerIdStr)) {
return new Map()
}

return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries()
const ledger = this.ledgerMap.get(peerIdStr)
return ledger ? ledger.wantlist.sortedEntries() : new Map()
}

/**
* @param {PeerId} peerId
*/
ledgerForPeer (peerId) {
const peerIdStr = peerId.toB58String()

Expand All @@ -164,12 +179,20 @@ class DecisionEngine {
}
}

/**
* @returns {PeerId[]}
*/
peers () {
return Array.from(this.ledgerMap.values()).map((l) => l.partner)
}

// Receive blocks either from an incoming message from the network, or from
// blocks being added by the client on the localhost (eg IPFS add)
/**
* Receive blocks either from an incoming message from the network, or from
* blocks being added by the client on the localhost (eg IPFS add)
*
* @param {Block[]} blocks
* @returns {void}
*/
receivedBlocks (blocks) {
if (!blocks.length) {
return
Expand Down Expand Up @@ -211,7 +234,13 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

// Handle incoming messages
/**
* Handle incoming messages
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<void>}
*/
async messageReceived (peerId, msg) {
const ledger = this._findOrCreate(peerId)

Expand Down Expand Up @@ -251,12 +280,24 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

/**
* @private
* @param {PeerId} peerId
* @param {CID[]} cids
* @returns {void}
*/
_cancelWants (peerId, cids) {
for (const c of cids) {
this._requestQueue.remove(c.toString(), peerId)
}
}

/**
* @private
* @param {PeerId} peerId
* @param {BitswapMessageEntry[]} wants
* @returns {Promise<void>}
*/
async _addWants (peerId, wants) {
// Get the size of each wanted block
const blockSizes = await this._getBlockSizes(wants.map(w => w.cid))
Expand Down Expand Up @@ -320,11 +361,21 @@ class DecisionEngine {
blockSize <= this._opts.maxSizeReplaceHasWithBlock
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, number>>}
*/
async _getBlockSizes (cids) {
const blocks = await this._getBlocks(cids)
return new Map([...blocks].map(([k, v]) => [k, v.data.length]))
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, Block>>}
*/
async _getBlocks (cids) {
const res = new Map()
await Promise.all(cids.map(async (cid) => {
Expand All @@ -347,7 +398,14 @@ class DecisionEngine {
})
}

// Clear up all accounting things after message was sent
/**
* Clear up all accounting things after message was sent
*
* @param {PeerId} peerId
* @param {Object} [block]
* @param {Uint8Array} block.data
* @param {CID} [block.cid]
*/
messageSent (peerId, block) {
const ledger = this._findOrCreate(peerId)
ledger.sentBytes(block ? block.data.length : 0)
Expand All @@ -356,15 +414,29 @@ class DecisionEngine {
}
}

/**
* @param {PeerId} peerId
* @returns {number}
*/
numBytesSentTo (peerId) {
return this._findOrCreate(peerId).accounting.bytesSent
}

/**
* @param {PeerId} peerId
* @returns {number}
*/

numBytesReceivedFrom (peerId) {
return this._findOrCreate(peerId).accounting.bytesRecv
}

peerDisconnected (peerId) {
/**
*
* @param {PeerId} _peerId
* @returns {void}
*/
peerDisconnected (_peerId) {
// if (this.ledgerMap.has(peerId.toB58String())) {
// this.ledgerMap.delete(peerId.toB58String())
// }
Expand All @@ -373,10 +445,16 @@ class DecisionEngine {
// in the peer request queue
}

/**
* @private
* @param {PeerId} peerId
* @returns {Ledger}
*/
_findOrCreate (peerId) {
const peerIdStr = peerId.toB58String()
if (this.ledgerMap.has(peerIdStr)) {
return this.ledgerMap.get(peerIdStr)
const ledger = this.ledgerMap.get(peerIdStr)
if (ledger) {
return ledger
}

const l = new Ledger(peerId)
Expand All @@ -399,3 +477,13 @@ class DecisionEngine {
}

module.exports = DecisionEngine

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('../stats')} Stats
* @typedef {import('../types').BlockData} BlockData
* @typedef {import('ipld-block')} Block
* @typedef {import('../types/message/entry')} BitswapMessageEntry
* @typedef {import('../types/wantlist/entry')} WantListEntry
* @typedef {import('../types').BlockStore} BlockStore
*/
Loading