Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
LuKks committed Jan 30, 2024
1 parent 6923796 commit 866b3cd
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 210 deletions.
246 changes: 64 additions & 182 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,89 +1,54 @@
const { Peer } = require('peerjs') // => STUN/TURN + RTCPeerConnection
const { RTCPeerConnection, RTCIceCandidate } = require('werift')
const Protomux = require('protomux')
const c = require('compact-encoding')
const safetyCatch = require('safety-catch')
const ReadyResource = require('ready-resource')
const sodium = require('sodium-universal')
const b4a = require('b4a')
const WebStream = require('./lib/web-stream.js')

const iceServers = [
{ urls: 'stun:stun.l.google.com:19302' }
]

// TODO: This could be a Duplex but don't know about that, for now emitting an event is good enough
module.exports = class WebPeer extends ReadyResource {
constructor (stream) {
super()

const id = b4a.toString(randomBytes(8), 'hex')

// this.peer = new Peer(id)
this.peer = new Peer()
this.peer = new RTCPeerConnection({ iceServers })
this.stream = stream
// this.mux = new Protomux(stream)
this.mux = null // new Protomux(stream)
this.channel = null

this.handshake = null
this.token = b4a.toString(randomBytes(8), 'hex') // TODO: Think another solution for validating connections
this.remote = null

this.peer.onicecandidate = onicecandidate.bind(this)

this.ready().catch(safetyCatch)
}

async _open () {
// TODO: Investigate about reusing the relayed handshake to create new SecretStream instances
/* const onhandshake = () => this.handshake = getHandshake(this.stream)
if (this.stream.handshakeHash) onhandshake() // Or this.stream._encrypt
else this.stream.once('handshake', onhandshake) */

this.peer.on('connection', rawStream => {
console.log('peerjs incoming', rawStream)

// TODO: Check metadata.token before accepting the connection

rawStream.on('open', () => {
console.log('rawStream open')

// if (!this.handshake) throw new Error('No handshake')

const duplex = new WebStream(rawStream)

this.remote = duplex // new SecretStream(false, duplex)
// this.remote.on('data', (data) => console.log(data))
// this.remote.on('close', () => console.log('remote closed'))
this.remote.on('error', console.error)

// const done = () => this.mux.destroy()
// waitForRemote(this.remote).then(done, done)

this.emit('continue', this.remote)
})

rawStream.on('error', function (err) {
console.log('rawStream error', err)
})

rawStream.on('close', function () {
console.log('rawStream close')
})
})

try {
await waitForPeer(this.peer)
} catch (err) {
console.error(err)
this.stream.destroy()
throw err
}

this.mux = new Protomux(this.stream)

this._attachChannel()
this.channel = this.mux.createChannel({
protocol: 'hyper-webrtc/signaling',
id: null,
handshake: c.json, // TODO: Make strict messages
onopen: this._onmuxopen.bind(this),
onerror: this._onmuxerror.bind(this),
onclose: this._onmuxclose.bind(this),
messages: [
{ encoding: c.json, onmessage: this._onwireice.bind(this) },
{ encoding: c.json, onmessage: this._onwireoffer.bind(this) },
{ encoding: c.json, onmessage: this._onwireanswer.bind(this) }
]
})

console.log('peer.id', this.peer.id)
this.channel.userData = this

this.channel.open({
// isInitiator: this.mux.stream.isInitiator,
id: this.peer.id,
token: this.token
})

// TODO: Maximize speed at connecting, e.g. don't wait until open
}

_close () {
Expand All @@ -92,72 +57,54 @@ module.exports = class WebPeer extends ReadyResource {
this.stream.destroy()
}

_attachChannel () {
const channel = this.mux.createChannel({
protocol: 'hyperconnection',
id: null,
handshake: c.json, // TODO: Make strict messages
onopen: this._onmuxopen.bind(this),
onerror: this._onmuxerror.bind(this),
onclose: this._onmuxclose.bind(this),
messages: [
// { encoding: c.json, onmessage: this._onmuxmessage }
]
})

if (channel === null) return

this.channel = channel
}

_onmuxopen (handshake) {
async _onmuxopen (handshake) {
console.log('_onmuxopen', handshake)

if (this.mux.stream.isInitiator) {
console.log('Connecting to', handshake.id)

// TODO: Investigate if metadata is kept truly private between both peers (E2E encrypted, not publicly stored in the middle server, etc)
const rawStream = this.peer.connect(handshake.id, {
reliable: true,
/* metadata: {
token: this.token
} */
})

rawStream.on('open', () => {
console.log('rawStream open')

// if (!this.handshake) throw new Error('No handshake')
// const rawStream = this.peer.createDataChannel('wire', { negotiated: true, id: 0 })

const duplex = new WebStream(rawStream)
const done = (rawStream) => {
this.remote = new WebStream(this.mux.stream.isInitiator, rawStream, {
handshakeHash: this.mux.stream.handshakeHash
}) // new SecretStream(false, rawStream)

this.remote = duplex // new SecretStream(true, duplex)
this.remote.once('connect', () => {
this.emit('continue', this.remote) // TODO: It should be a Duplex to avoid this event
})
}

/* this.remote.on('connect', () => {
console.log('remote connected')
})
if (this.mux.stream.isInitiator) {
const rawStream = this.peer.createDataChannel('wire')
done(rawStream)

const offer = await this.peer.createOffer()
await this.peer.setLocalDescription(offer)

this.channel.messages[1].send({ offer: this.peer.localDescription })
} else {
this.peer.ondatachannel = (e) => {
const rawStream = e.channel
done(rawStream)
}
}
}

this.remote.on('open', () => {
console.log('remote opened')
}) */
_onwireice ({ ice }) {
this.peer.addIceCandidate(new RTCIceCandidate(ice))
}

this.remote.on('error', console.error)
async _onwireoffer ({ offer }) {
// TODO: Only once

// TODO: Can destroy it right away?
// const done = () => this.mux.destroy()
// waitForRemote(this.remote).then(done, done)
this.peer.setRemoteDescription(offer)

this.emit('continue', this.remote)
})
const answer = await this.peer.createAnswer()
await this.peer.setLocalDescription(answer)

rawStream.on('error', function (err) {
console.log('rawStream error', err)
})
this.channel.messages[2].send({ answer: this.peer.localDescription })
}

rawStream.on('close', function () {
console.log('rawStream close')
})
}
_onwireanswer ({ answer }) {
this.peer.setRemoteDescription(answer)
}

_onmuxerror (err) {
Expand All @@ -172,71 +119,6 @@ module.exports = class WebPeer extends ReadyResource {
}
}

/* function getHandshake (stream) {
return {
publicKey: stream.publicKey,
remotePublicKey: stream.remotePublicKey,
hash: stream.handshakeHash,
tx: stream.tx || stream._encrypt?.key || null,
rx: stream.rx || stream._decrypt?.key || null
}
} */

function waitForRemote (remote) {
return new Promise(resolve => {
remote.on('open', done)
remote.on('error', done)
remote.on('close', done)

function done () {
remote.off('open', done)
remote.off('error', done)
remote.off('close', done)

resolve()
}
})
}

// TODO: Simplify a bit
function waitForPeer (peer) {
return new Promise((resolve, reject) => {
if (peer.disconnected === true) {
reject(new Error('Peer is disconnected'))
return
}

if (peer.destroyed) {
reject(new Error('Peer is destroyed'))
return
}

peer.on('open', onopen)
peer.on('error', onclose)
peer.on('close', onclose)

function onopen (id) {
cleanup()
resolve()
}

function onclose (err) {
cleanup()

if (err) reject(err)
else reject(new Error('Could not create peer'))
}

function cleanup () {
peer.off('open', onopen)
peer.off('error', onclose)
peer.off('close', onclose)
}
})
}

function randomBytes (n) {
const buf = b4a.allocUnsafe(n)
sodium.randombytes_buf(buf)
return buf
function onicecandidate (e) {
if (e.candidate) this.channel.messages[0].send({ ice: e.candidate })
}
Loading

0 comments on commit 866b3cd

Please sign in to comment.