Skip to content

Commit

Permalink
Simplify everything
Browse files Browse the repository at this point in the history
  • Loading branch information
LuKks committed Feb 5, 2024
1 parent 4514175 commit deb453e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 129 deletions.
132 changes: 45 additions & 87 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,125 +2,83 @@ const { RTCPeerConnection, RTCIceCandidate } = require('werift')
const Protomux = require('protomux')
const c = require('compact-encoding')
const safetyCatch = require('safety-catch')
const ReadyResource = require('ready-resource')
const WebStream = require('./lib/web-stream.js')

// TODO: Investigate how to deploy STUN servers
const iceServers = [
{ urls: 'stun:stun.l.google.com:19302' }
]

// TODO: This could be a Duplex but don't know about that, for now emitting an event is good enough
module.exports = class WebPeer extends ReadyResource {
constructor (stream) {
super()
module.exports = class WebPeer {
constructor (relay) {
this._rtc = new RTCPeerConnection({ iceServers })
this._relay = relay
this._mux = new Protomux(relay) // Protomux.from

this.peer = new RTCPeerConnection({ iceServers })
this.stream = stream
this.mux = null // new Protomux(stream)
this.channel = null
this.remote = null
this._channel = this._mux.createChannel({ protocol: 'hyper-webrtc/signal' })

this.peer.onicecandidate = onicecandidate.bind(this)
if (this._channel === null) throw new Error('Channel duplicated')

this.ready().catch(safetyCatch)
}

async _open () {
this.mux = new Protomux(this.stream)

this.channel = this.mux.createChannel({
protocol: 'hyper-webrtc/signaling',
id: null,
handshake: c.json, // TODO: Make strict messages
onopen: this._onmuxopen.bind(this),
onerror: this._onmuxerror.bind(this),
onclose: this._onmuxclose.bind(this),
messages: [
{ encoding: c.json, onmessage: this._onwireice.bind(this) },
{ encoding: c.json, onmessage: this._onwireoffer.bind(this) },
{ encoding: c.json, onmessage: this._onwireanswer.bind(this) }
]
})

this.channel.userData = this
this._ice = this._channel.addMessage({ encoding: c.any, onmessage: this._onice.bind(this) })
this._offer = this._channel.addMessage({ encoding: c.any, onmessage: this._onoffer.bind(this) })
this._answer = this._channel.addMessage({ encoding: c.any, onmessage: this._onanswer.bind(this) })

this.channel.open({
// isInitiator: this.mux.stream.isInitiator,
})

// TODO: Maximize speed at connecting, e.g. don't wait until open
}
this._channel.open()

_close () {
console.log('_closed')
this.peer.close()
this.stream.destroy()
this._rtc.onicecandidate = onicecandidate.bind(this)
}

async _onmuxopen (handshake) {
console.log('_onmuxopen', handshake)

// const rawStream = this.peer.createDataChannel('wire', { negotiated: true, id: 0 })
static from (relay) {
const peer = new this(relay)

const done = (rawStream) => {
this.remote = new WebStream(this.mux.stream.isInitiator, rawStream, {
handshakeHash: this.mux.stream.handshakeHash
}) // new SecretStream(false, rawStream)
const rawStream = peer._rtc.createDataChannel('wire', { negotiated: true, id: 0 })

this.remote.on('close', () => {
console.log('remote closed')
this.close().catch(safetyCatch)
})
const stream = new WebStream(relay.isInitiator, rawStream, {
handshakeHash: relay.handshakeHash
})

this.emit('continue', this.remote) // TODO: It should be a Duplex to avoid this event
}
relay.on('close', () => {
peer._rtc.close()
rawStream.close()
})

if (this.mux.stream.isInitiator) {
const rawStream = this.peer.createDataChannel('wire')
done(rawStream)
stream.on('close', () => {
peer._rtc.close()
relay.destroy()
})

const offer = await this.peer.createOffer()
await this.peer.setLocalDescription(offer)
peer.negotiate().catch(safetyCatch)

this.channel.messages[1].send({ offer: this.peer.localDescription })
} else {
this.peer.ondatachannel = (e) => {
const rawStream = e.channel
done(rawStream)
}
}
return stream
}

async _onwireice ({ ice }) {
await this.peer.addIceCandidate(new RTCIceCandidate(ice))
}
async negotiate () {
if (!this._relay.isInitiator) return

async _onwireoffer ({ offer }) {
await this.peer.setRemoteDescription(offer)
const offer = await this._rtc.createOffer()
await this._rtc.setLocalDescription(offer)

const answer = await this.peer.createAnswer()
await this.peer.setLocalDescription(answer)

this.channel.messages[2].send({ answer: this.peer.localDescription })
this._offer.send({ offer: this._rtc.localDescription })
}

async _onwireanswer ({ answer }) {
await this.peer.setRemoteDescription(answer)
async _onice ({ ice }) {
await this._rtc.addIceCandidate(new RTCIceCandidate(ice))
}

_onmuxerror (err) {
console.error('_onmuxerror', err)
}
async _onoffer ({ offer }) {
await this._rtc.setRemoteDescription(offer)

_onmuxclose (isRemote) {
console.log('_onmuxclose', { isRemote }, 'Stream created?', !!this.remote)
const answer = await this._rtc.createAnswer()
await this._rtc.setLocalDescription(answer)

if (!this.remote) this.peer.close()
this._answer.send({ answer: this._rtc.localDescription })
}

this.stream.destroy()
async _onanswer ({ answer }) {
await this._rtc.setRemoteDescription(answer)
}
}

function onicecandidate (e) {
if (e.candidate) this.channel.messages[0].send({ ice: e.candidate })
if (e.candidate) this._ice.send({ ice: e.candidate })
}
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@
"url": "https://github.com/LuKks/hyper-webrtc/issues"
},
"homepage": "https://github.com/LuKks/hyper-webrtc#readme",
"devDependencies": {
"brittle": "^3.3.2",
"standard": "^17.1.0"
},
"dependencies": {
"@hyperswarm/dht-relay": "^0.4.3",
"b4a": "^1.6.4",
"compact-encoding": "^2.13.0",
"protomux": "^3.5.1",
"safety-catch": "^1.0.2",
"streamx": "^2.15.7",
"werift": "^0.19.0"
},
"devDependencies": {
"brittle": "^3.3.2",
"hypercore": "^10.32.6",
"hypercore-crypto": "^3.4.0",
"hyperdht": "^6.11.5",
"hyperswarm": "^4.7.13",
"protomux": "^3.5.1",
"random-access-memory": "^6.2.0",
"ready-resource": "^1.0.0",
"sodium-universal": "^4.0.0",
"werift": "^0.19.0",
"standard": "^17.1.0",
"ws": "^8.16.0"
}
}
46 changes: 12 additions & 34 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@ const Hypercore = require('hypercore')
const RAM = require('random-access-memory')
const crypto = require('hypercore-crypto')
const HypercoreId = require('hypercore-id-encoding')
const b4a = require('b4a')
const HyperWebRTC = require('./index.js')

test('basic', async function (t) {
// t.plan(1)
t.plan(3)

const bootstrap = await createBootstrap(t)
const relayAddress = createRelayServer(t, { bootstrap })

const key = await writer(t, { relayAddress })

await reader(t, key, { relayAddress })

// await new Promise(resolve => setTimeout(resolve, 5000))
})

async function writer (t, { relayAddress }) {
Expand All @@ -38,27 +37,15 @@ async function writer (t, { relayAddress }) {
t.teardown(() => core.close())

const done = core.findingPeers()
swarm.on('connection', function (signal) {
const peer = new HyperWebRTC(signal)

// t.teardown(() => peer.close())

peer.on('continue', function (stream) {
console.log('core replicate')
t.teardown(() => stream.destroy())

const s = core.replicate(stream)
stream.on('close', () => s.destroy())
})
swarm.on('connection', function (relay) {
const rtc = HyperWebRTC.from(relay)
core.replicate(rtc)
})
const discoveryWeb = crypto.discoveryKey(core.discoveryKey)
const discovery = swarm.join(discoveryWeb)
swarm.flush().then(done, done)

await discovery.flushed()
console.log('Fully announced')

console.log('Writer ID', core.id)

return core.id
}
Expand All @@ -73,34 +60,25 @@ async function reader (t, key, { relayAddress }) {
t.teardown(() => clone.close())

const done = clone.findingPeers()
swarm.on('connection', function (signal) {
const peer = new HyperWebRTC(signal)

// t.teardown(() => peer.close())

peer.on('continue', function (stream) {
console.log('clone replicate')
t.teardown(() => stream.destroy())

const s = clone.replicate(stream)
stream.on('close', () => s.destroy())
})
swarm.on('connection', function (relay) {
const rtc = HyperWebRTC.from(relay)
clone.replicate(rtc)
})
const discoveryWeb = crypto.discoveryKey(clone.discoveryKey)
swarm.join(discoveryWeb)
swarm.flush().then(done, done)

console.log(await clone.get(0))
console.log(await clone.get(1))
console.log(await clone.get(2))
t.alike(await clone.get(0), b4a.from('a'))
t.alike(await clone.get(1), b4a.from('b'))
t.alike(await clone.get(2), b4a.from('c'))
}

function createRelayClient (t, relayAddress) {
const ws = new WebSocket(relayAddress)
const dht = new DHTRelay(new Stream(true, ws))
// TODO: dht-relay does not have 'close' event

t.teardown(() => dht.destroy({ force: true }), { order: Infinity })
t.teardown(() => dht.destroy(), { order: Infinity })

return dht
}
Expand Down

0 comments on commit deb453e

Please sign in to comment.