Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"extends": "ipfs"
}
27 changes: 27 additions & 0 deletions .github/workflows/typecheck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
on:
push:
branches:
- master
- main
- default
pull_request:
branches:
- '**'

name: Typecheck
jobs:
check:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [12.x]
steps:
- uses: actions/checkout@v1
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: Install dependencies
run: npm install
- name: Typecheck
uses: gozala/[email protected]
25 changes: 19 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
"./test/utils/create-libp2p-node": false,
"./test/utils/create-temp-repo-nodejs.js": "./test/utils/create-temp-repo-browser.js"
},
"typesVersions": {
"*": {
"*": [
"dist/*"
]
}
},
"files": [
"dist",
"src"
Expand All @@ -17,14 +24,18 @@
"test:browser": "aegir test -t browser -t webworker",
"test:node": "aegir test -t node",
"lint": "aegir lint",
"check": "tsc --noEmit",
"release": "aegir release",
"release-minor": "aegir release --type minor",
"release-major": "aegir release --type major",
"bench": "node benchmarks/index",
"build": "aegir build",
"build": "npm run build:js & npm run build:types",
"build:js": "aegir build",
"build:types": "tsc --emitDeclarationOnly --declarationDir dist",
"coverage": "aegir coverage --provider codecov",
"docs": "aegir docs",
"benchmarks": "node test/benchmarks/get-many"
"benchmarks": "node test/benchmarks/get-many",
"prepare": "npm run build:types"
},
"repository": {
"type": "git",
Expand All @@ -43,7 +54,7 @@
"homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme",
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"aegir": "^26.0.0",
"aegir": "^27.0.0",
"benchmark": "^2.1.4",
"delay": "^4.3.0",
"ipfs-repo": "^6.0.1",
Expand All @@ -69,15 +80,17 @@
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^8.0.0"
"uuid": "^8.0.0",
"typescript": "^4.0.3",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"typescript": "^4.0.3",

this comes from aegir

"@types/debug": "^4.1.5"
},
"dependencies": {
"abort-controller": "^3.0.0",
"any-signal": "^1.1.0",
"bignumber.js": "^9.0.0",
"cids": "^1.0.0",
"debug": "^4.1.0",
"ipld-block": "^0.10.0",
"debug": "^4.2.0",
"ipld-block": "git://github.com/ipld/js-ipld-block#typegen",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
Expand Down
119 changes: 103 additions & 16 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ const TARGET_MESSAGE_SIZE = 16 * 1024
const MAX_SIZE_REPLACE_HAS_WITH_BLOCK = 1024

class DecisionEngine {
/**
*
* @param {PeerId} peerId
* @param {*} blockstore
* @param {import('../network')} network
* @param {Stats} stats
* @param {Object} [opts]
* @param {number} [opts.targetMessageSize]
* @param {number} [opts.maxSizeReplaceHasWithBlock]
*/
constructor (peerId, blockstore, network, stats, opts) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
Expand All @@ -34,6 +44,7 @@ class DecisionEngine {
this._opts = this._processOpts(opts)

// A list of of ledgers by their partner id
/** @type {Map<string, Ledger>} */
this.ledgerMap = new Map()
this._running = false

Expand Down Expand Up @@ -112,7 +123,7 @@ class DecisionEngine {

// If there's nothing in the message, bail out
if (msg.empty) {
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peerId is obtained on line 76 via following code:

 const { peerId, tasks, pendingSize } = this._requestQueue.popTasks(this._opts.targetMessageSize)

However popTasks returns an object without peerId in 2 code paths out of 3. At the same time tasksDone and messageSent below do not expect undefined.

It was not exactly clear what made most sense here, so I just conditioned those calls on peerIds presence, but it would be good to make sure all this makes sense.


// Trigger the next round of task processing
this._scheduleProcessTasks()
Expand All @@ -122,32 +133,36 @@ class DecisionEngine {

try {
// Send the message
await this.network.sendMessage(peerId, msg)
peerId && await this.network.sendMessage(peerId, msg)

// Peform sent message accounting
for (const block of blocks.values()) {
this.messageSent(peerId, block)
peerId && this.messageSent(peerId, block)
}
} catch (err) {
this._log.error(err)
}

// Free the tasks up from the request queue
this._requestQueue.tasksDone(peerId, tasks)
peerId && this._requestQueue.tasksDone(peerId, tasks)

// Trigger the next round of task processing
this._scheduleProcessTasks()
}

/**
* @param {PeerId} peerId
* @returns {Map<string, WantListEntry>}
*/
wantlistForPeer (peerId) {
const peerIdStr = peerId.toB58String()
if (!this.ledgerMap.has(peerIdStr)) {
return new Map()
}

return this.ledgerMap.get(peerIdStr).wantlist.sortedEntries()
const ledger = this.ledgerMap.get(peerIdStr)
return ledger ? ledger.wantlist.sortedEntries() : new Map()
}

/**
* @param {PeerId} peerId
*/
ledgerForPeer (peerId) {
const peerIdStr = peerId.toB58String()

Expand All @@ -164,12 +179,20 @@ class DecisionEngine {
}
}

/**
* @returns {PeerId[]}
*/
peers () {
return Array.from(this.ledgerMap.values()).map((l) => l.partner)
}

// Receive blocks either from an incoming message from the network, or from
// blocks being added by the client on the localhost (eg IPFS add)
/**
* Receive blocks either from an incoming message from the network, or from
* blocks being added by the client on the localhost (eg IPFS add)
*
* @param {Block[]} blocks
* @returns {void}
*/
receivedBlocks (blocks) {
if (!blocks.length) {
return
Expand Down Expand Up @@ -211,7 +234,13 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

// Handle incoming messages
/**
* Handle incoming messages
*
* @param {PeerId} peerId
* @param {Message} msg
* @returns {Promise<void>}
*/
async messageReceived (peerId, msg) {
const ledger = this._findOrCreate(peerId)

Expand Down Expand Up @@ -251,12 +280,24 @@ class DecisionEngine {
this._scheduleProcessTasks()
}

/**
* @private
* @param {PeerId} peerId
* @param {CID[]} cids
* @returns {void}
*/
_cancelWants (peerId, cids) {
for (const c of cids) {
this._requestQueue.remove(c.toString(), peerId)
}
}

/**
* @private
* @param {PeerId} peerId
* @param {BitswapMessageEntry[]} wants
* @returns {Promise<void>}
*/
async _addWants (peerId, wants) {
// Get the size of each wanted block
const blockSizes = await this._getBlockSizes(wants.map(w => w.cid))
Expand Down Expand Up @@ -320,11 +361,21 @@ class DecisionEngine {
blockSize <= this._opts.maxSizeReplaceHasWithBlock
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, number>>}
*/
async _getBlockSizes (cids) {
const blocks = await this._getBlocks(cids)
return new Map([...blocks].map(([k, v]) => [k, v.data.length]))
}

/**
* @private
* @param {CID[]} cids
* @returns {Promise<Map<string, Block>>}
*/
async _getBlocks (cids) {
const res = new Map()
await Promise.all(cids.map(async (cid) => {
Expand All @@ -347,7 +398,14 @@ class DecisionEngine {
})
}

// Clear up all accounting things after message was sent
/**
* Clear up all accounting things after message was sent
*
* @param {PeerId} peerId
* @param {Object} [block]
* @param {Uint8Array} block.data
* @param {CID} [block.cid]
*/
messageSent (peerId, block) {
const ledger = this._findOrCreate(peerId)
ledger.sentBytes(block ? block.data.length : 0)
Expand All @@ -356,15 +414,29 @@ class DecisionEngine {
}
}

/**
* @param {PeerId} peerId
* @returns {number}
*/
numBytesSentTo (peerId) {
return this._findOrCreate(peerId).accounting.bytesSent
}

/**
* @param {PeerId} peerId
* @returns {number}
*/

numBytesReceivedFrom (peerId) {
return this._findOrCreate(peerId).accounting.bytesRecv
}

peerDisconnected (peerId) {
/**
*
* @param {PeerId} _peerId
* @returns {void}
*/
peerDisconnected (_peerId) {
// if (this.ledgerMap.has(peerId.toB58String())) {
// this.ledgerMap.delete(peerId.toB58String())
// }
Expand All @@ -373,10 +445,16 @@ class DecisionEngine {
// in the peer request queue
}

/**
* @private
* @param {PeerId} peerId
* @returns {Ledger}
*/
_findOrCreate (peerId) {
const peerIdStr = peerId.toB58String()
if (this.ledgerMap.has(peerIdStr)) {
return this.ledgerMap.get(peerIdStr)
const ledger = this.ledgerMap.get(peerIdStr)
if (ledger) {
return ledger
}

const l = new Ledger(peerId)
Expand All @@ -399,3 +477,12 @@ class DecisionEngine {
}

module.exports = DecisionEngine

/**
* @typedef {import('../types').PeerId} PeerId
* @typedef {import('../stats')} Stats
* @typedef {import('../types').BlockData} BlockData
* @typedef {import('../types').Block} Block
* @typedef {import('../types/message/entry')} BitswapMessageEntry
* @typedef {import('../types/wantlist/entry')} WantListEntry
*/
51 changes: 51 additions & 0 deletions src/decision-engine/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
export interface TaskMerger {
/**
* Ggiven the existing tasks with the same topic, does the task add some new
* information? Used to decide whether to merge the task or ignore it.
*/
hasNewInfo (task:Task, tasksWithTopic:Task[]): boolean

/**
* Merge the information from the task into the existing pending task.
*/
merge (newTask, existingTask): void
}

export interface Task {
/**
* A name for the Task (like an id but not necessarily unique)
*/
topic: string
/**
* Priority for the Task (tasks are ordered by priority per peer).
*/
priority: number
/**
* The size of the task, e.g. the number of bytes in a block.
*/
size: number

data: TaskData
}

export interface TaskData {
/**
* The size of the block, if known (if we don't have the block this is zero)
*/
blockSize: number
/**
* Indicates if the request is for a block or for a HAVE.
*/
isWantBlock: boolean
/**
* Indicates if we have the block.
*/
haveBlock: boolean
/**
* Indicates whether to send a DONT_HAVE response if we don't have the block.
* If this is `false` and we don't have the block, we just ignore the
* want-block request (useful for discovery where we query lots of peers but
* don't want a response unless the peer has the block).
*/
sendDontHave: boolean
}
Loading