Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Awesome DHT #86

Merged
merged 8 commits into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const WS = require('libp2p-websockets')
const spdy = require('libp2p-spdy')
const secio = require('libp2p-secio')
const MulticastDNS = require('libp2p-mdns')
const DHT = require('libp2p-kad-dht')

class Node extends libp2p {
constructor (peerInfo, peerBook, options) {
Expand All @@ -95,7 +96,9 @@ class Node extends libp2p {
},
discovery: [
new MulticastDNS(peerInfo, 'your-identifier')
]
],
// DHT is passed as its own enabling PeerRouting, ContentRouting and DHT itself components
dht: DHT
}

super(modules, peerInfo, peerBook, options)
Expand Down Expand Up @@ -144,6 +147,36 @@ class Node extends libp2p {

`callback` is a function with the following `function (err) {}` signature, where `err` is an Error in case stopping the node fails.

#### `libp2p.peerRouting.findPeer(id, callback)`

> Looks up for multiaddrs of a peer in the DHT

- `id`: instance of [PeerId][]

#### `libp2p.contentRouting.findProviders(key, timeout, callback)`

- `key`: Buffer
- `timeout`: Number miliseconds

#### `libp2p.contentRouting.provide(key, timeout, callback)`

- `key`: Buffer
- `timeout`: Number miliseconds

#### `libp2p.dht.put(key, value, callback)`

- `key`: Buffer
- `value`: Buffer

#### `libp2p.dht.get(key, callback)`

- `key`: Buffer

#### `libp2p.dht.getMany(key, nVals, callback)`

- `key`: Buffer
- `nVals`: Number

#### `libp2p.handle(protocol, handlerFunc [, matchFunc])`

> Handle new protocol
Expand Down
163 changes: 115 additions & 48 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const assert = require('assert')

const setImmediate = require('async/setImmediate')
const each = require('async/each')
const series = require('async/series')

const Ping = require('libp2p-ping')
const Swarm = require('libp2p-swarm')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const mafmt = require('mafmt')
const PeerBook = require('peer-book')
const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const EventEmitter = require('events').EventEmitter
const assert = require('assert')
const Ping = require('libp2p-ping')
const setImmediate = require('async/setImmediate')

exports = module.exports

Expand All @@ -32,9 +36,7 @@ class Node extends EventEmitter {
if (this.modules.connection.muxer) {
let muxers = this.modules.connection.muxer
muxers = Array.isArray(muxers) ? muxers : [muxers]
muxers.forEach((muxer) => {
this.swarm.connection.addStreamMuxer(muxer)
})
muxers.forEach((muxer) => this.swarm.connection.addStreamMuxer(muxer))

// If muxer exists, we can use Identify
this.swarm.connection.reuse()
Expand Down Expand Up @@ -73,9 +75,49 @@ class Node extends EventEmitter {
// Mount default protocols
Ping.mount(this.swarm)

// Not fully implemented in js-libp2p yet
this.routing = undefined
this.records = undefined
// dht provided components (peerRouting, contentRouting, dht)
if (_modules.DHT) {
this._dht = new this.modules.DHT(this, 20, _options.DHT && _options.DHT.datastore)
}

this.peerRouting = {
findPeer: (id, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.findPeer(id, callback)
}
}

this.contentRouting = {
findProviders: (key, timeout, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.findProviders(key, timeout, callback)
},
provide: (key, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.provide(key, callback)
}
}

this.dht = {
put: (key, value, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.put(key, value, callback)
},
get: (key, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.get(key, callback)
},
getMany (key, nVals, callback) {
assert(this._dht, 'DHT is not available')

this._dht.getMany(key, nVals, callback)
}
}
}

/*
Expand Down Expand Up @@ -117,24 +159,30 @@ class Node extends EventEmitter {
}
})

this.swarm.listen((err) => {
if (err) {
return callback(err)
}
if (ws) {
this.swarm.transport.add(ws.tag || ws.constructor.name, ws)
series([
(cb) => this.swarm.listen(cb),
(cb) => {
// listeners on, libp2p is on
this.isOnline = true

if (ws) {
// always add dialing on websockets
this.swarm.transport.add(ws.tag || ws.constructor.name, ws)
}

// all transports need to be setup before discover starts
if (this.modules.discovery) {
return each(this.modules.discovery, (d, cb) => d.start(cb), cb)
}
cb()
},
(cb) => {
if (this._dht) {
return this._dht.start(cb)
}
cb()
}

this.isOnline = true

if (this.modules.discovery) {
this.modules.discovery.forEach((discovery) => {
setImmediate(() => discovery.start(() => {}))
})
}

callback()
})
], callback)
}

/*
Expand All @@ -149,7 +197,15 @@ class Node extends EventEmitter {
})
}

this.swarm.close(callback)
series([
(cb) => {
if (this._dht) {
return this._dht.stop(cb)
}
cb()
},
(cb) => this.swarm.close(cb)
], callback)
}

isOn () {
Expand All @@ -158,8 +214,13 @@ class Node extends EventEmitter {

ping (peer, callback) {
assert(this.isOn(), OFFLINE_ERROR_MESSAGE)
const peerInfo = this._getPeerInfo(peer)
callback(null, new Ping(this.swarm, peerInfo))
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) {
return callback(err)
}

callback(null, new Ping(this.swarm, peerInfo))
})
}

dial (peer, protocol, callback) {
Expand All @@ -170,27 +231,31 @@ class Node extends EventEmitter {
protocol = undefined
}

let peerInfo
try {
peerInfo = this._getPeerInfo(peer)
} catch (err) {
return callback(err)
}

this.swarm.dial(peerInfo, protocol, (err, conn) => {
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) {
return callback(err)
}
this.peerBook.put(peerInfo)
callback(null, conn)

this.swarm.dial(peerInfo, protocol, (err, conn) => {
if (err) {
return callback(err)
}
this.peerBook.put(peerInfo)
callback(null, conn)
})
})
}

hangUp (peer, callback) {
assert(this.isOn(), OFFLINE_ERROR_MESSAGE)
const peerInfo = this._getPeerInfo(peer)

this.swarm.hangUp(peerInfo, callback)
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) {
return callback(err)
}

this.swarm.hangUp(peerInfo, callback)
})
}

handle (protocol, handlerFunc, matchFunc) {
Expand All @@ -204,10 +269,12 @@ class Node extends EventEmitter {
/*
* Helper method to check the data type of peer and convert it to PeerInfo
*/
_getPeerInfo (peer) {
_getPeerInfo (peer, callback) {
let p
// PeerInfo
if (PeerInfo.isPeerInfo(peer)) {
p = peer
// Multiaddr instance (not string)
} else if (multiaddr.isMultiaddr(peer)) {
const peerIdB58Str = peer.getPeerId()
try {
Expand All @@ -216,19 +283,19 @@ class Node extends EventEmitter {
p = new PeerInfo(PeerId.createFromB58String(peerIdB58Str))
}
p.multiaddrs.add(peer)
// PeerId
} else if (PeerId.isPeerId(peer)) {
const peerIdB58Str = peer.toB58String()
try {
p = this.peerBook.get(peerIdB58Str)
} catch (err) {
// TODO this is where PeerRouting comes into place
throw new Error('No knowledge about: ' + peerIdB58Str)
return this.peerRouting.findPeer(peer, callback)
}
} else {
throw new Error('peer type not recognized')
return setImmediate(() => callback(new Error('peer type not recognized')))
}

return p
setImmediate(() => callback(null, p))
}
}

Expand Down