Skip to content

Commit

Permalink
Add Terminate Message to Hub
Browse files Browse the repository at this point in the history
  • Loading branch information
sinclairzx81 committed Aug 11, 2024
1 parent f0ef88e commit 7f56cfb
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 34 deletions.
5 changes: 3 additions & 2 deletions src/http/fetch.mts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ THE SOFTWARE.

import { HttpListenerRequestInit, HttpListenerResponseInit } from './listener.mjs'

import * as Async from '../async/index.mjs'
import * as Buffer from '../buffer/index.mjs'
import * as Stream from '../stream/index.mjs'
import * as Net from '../net/index.mjs'
Expand Down Expand Up @@ -120,7 +121,7 @@ export async function fetch(net: Net.NetModule, endpoint: string, requestInit: R
// send request body to server
sendRequestBody(duplex, requestInit).catch((error) => console.error(error))
// read server response signal
const signal = await readResponseSignal(duplex)
const signal = await Async.timeout(readResponseSignal(duplex), { timeout: 4000, error: new Error('A timeout occured reading the http response signal') })
if (signal === null) {
await duplex.close()
throw Error(`Connection to ${endpoint} terminated unexpectedly`)
Expand All @@ -131,7 +132,7 @@ export async function fetch(net: Net.NetModule, endpoint: string, requestInit: R
throw Error('Server is using alternate protocol')
}
// read server response init
const responseInit = await readListenerResponseInit(duplex)
const responseInit = await Async.timeout(readListenerResponseInit(duplex), { timeout: 4000, error: new Error('A timeout occured reading http response init') })
if (responseInit === null) {
await duplex.close()
throw Error('Unable to parse server response headers')
Expand Down
17 changes: 13 additions & 4 deletions src/webrtc/webrtc.mts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { WebRtcTrackListener, WebRtcTrackListenerAcceptCallback } from './track/
// ------------------------------------------------------------------
// WebRtcMessage
// ------------------------------------------------------------------
export type WebRtcMessage = WebRtcCandidateMessage | WebRtcDescriptionMessage
export type WebRtcMessage = WebRtcCandidateMessage | WebRtcDescriptionMessage | WebRtcTerminateMessage

export type WebRtcCandidateMessage = {
type: 'candidate'
Expand All @@ -45,6 +45,9 @@ export type WebRtcDescriptionMessage = {
type: 'description'
description: RTCSessionDescription
}
export type WebRtcTerminateMessage = {
type: 'terminate'
}
// ------------------------------------------------------------------
// WebRtcPeer
// ------------------------------------------------------------------
Expand Down Expand Up @@ -107,6 +110,11 @@ export class WebRtcModule implements Dispose.Dispose {
open.reject(new Error(`Connection to '${remoteAddress}:${port}' timed out`))
}, 4000)
return await open.promise()
}
/** Terminates the RTCPeerConnection associated with this remoteAddress and asks the remote peer to do the same */
public async terminate(remoteAddress: string) {
this.#hub.send({ to: remoteAddress, data: { type: 'terminate' }})
await this.#terminateConnection(remoteAddress)
}
// ------------------------------------------------------------------
// Media
Expand Down Expand Up @@ -170,7 +178,7 @@ export class WebRtcModule implements Dispose.Dispose {
lock.dispose()
}
}
async onHubCandidate(message: Hubs.HubMessage<WebRtcCandidateMessage>) {
async #onHubCandidate(message: Hubs.HubMessage<WebRtcCandidateMessage>) {
if (message.data.candidate === null) return
const peer = await this.#resolvePeer(message.from)
try {
Expand All @@ -184,7 +192,8 @@ export class WebRtcModule implements Dispose.Dispose {
const data = message.data as WebRtcMessage
switch (data.type) {
case 'description': return this.#onHubDescription(message as never)
case 'candidate': return this.onHubCandidate(message as never)
case 'candidate': return this.#onHubCandidate(message as never)
case 'terminate': return this.#terminateConnection(message.from)
}
}
// ------------------------------------------------------------------
Expand Down Expand Up @@ -217,7 +226,7 @@ export class WebRtcModule implements Dispose.Dispose {
if(!this.#channelListeners.has(port)) return datachannel.close()
const listener = this.#channelListeners.get(port)!
event.channel.addEventListener('close', () => peer.datachannels.delete(datachannel))
peer.datachannels.add(datachannel)
peer.datachannels.add(datachannel)
listener.accept(peer, datachannel)
}
#onPeerTrack(peer: WebRtcPeer, event: RTCTrackEvent) {
Expand Down
12 changes: 9 additions & 3 deletions test/index.mts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import './os/index.mjs'
// Runner
// ------------------------------------------------------------------
declare const Drift: any
Test.run({ filter: Drift.args[0] }).then((result) => {
return result.success ? Drift.close(0) : Drift.close(1)
})
if ('Drift' in globalThis) {
Test.run({ filter: Drift.args[0] }).then((result) => {
return result.success ? Drift.close(0) : Drift.close(1)
})
} else {
Test.run({ filter: '' }).then((result) => {
console.log('done', result)
})
}
53 changes: 28 additions & 25 deletions test/test/reporter.mts
Original file line number Diff line number Diff line change
Expand Up @@ -107,66 +107,69 @@ export class StdoutReporter implements Reporter {
#write(message: string) {
this.#writer.write(Buffer.encode(message))
}
#ansi(message: string) {
this.#writer.write(Buffer.encode(message))
}
// ----------------------------------------------------------------
// Handlers
// ----------------------------------------------------------------
public onContextBegin(context: DescribeContext) {
if (context.name === 'root') return
this.#write(Ansi.color.lightBlue)
this.#ansi(Ansi.color.lightBlue)
this.#write(context.name)
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#newline()
}
public onContextEnd(context: DescribeContext) {
if (context.name === 'root') return
this.#newline()
}
public onUnitBegin(unit: ItContext) {
this.#write(Ansi.color.gray)
this.#ansi(Ansi.color.gray)
this.#write(` - ${unit.name}`)
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
}
public onUnitEnd(unit: ItContext) {
if (unit.error === null) {
this.#write(Ansi.color.green)
this.#ansi(Ansi.color.green)
this.#write(` pass`)
this.#write(Ansi.reset)
this.#write(Ansi.color.lightBlue)
this.#ansi(Ansi.reset)
this.#ansi(Ansi.color.lightBlue)
this.#write(` ${unit.elapsed.toFixed()} ms`)
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
} else {
this.#write(Ansi.color.lightRed)
this.#ansi(Ansi.color.lightRed)
this.#write(' fail')
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
}
this.#newline()
}
#printFailureSummary(context: DescribeContext) {
for (const error of context.failures()) {
this.#write(Ansi.color.lightBlue)
this.#ansi(Ansi.color.lightBlue)
this.#write(`${error.context} `)
this.#write(Ansi.reset)
this.#write(Ansi.color.gray)
this.#ansi(Ansi.reset)
this.#ansi(Ansi.color.gray)
this.#write(`${error.unit}`)
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#newline()
this.#newline()

this.#write(Ansi.color.lightRed)
this.#write(` error`)
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#write(`: ${error.error.message}`)
this.#newline()
if (error.error instanceof Assert.AssertError) {
this.#write(Ansi.color.lightGreen)
this.#ansi(Ansi.color.lightGreen)
this.#write(` expect`)
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#write(`: ${ValueFormatter.format(error.error.expect)}`)
this.#newline()

this.#write(Ansi.color.lightRed)
this.#ansi(Ansi.color.lightRed)
this.#write(` actual`)
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#write(`: ${ValueFormatter.format(error.error.actual)}`)
this.#newline()
}
Expand All @@ -175,21 +178,21 @@ export class StdoutReporter implements Reporter {
this.#newline()
}
#printCompletionSummary(context: DescribeContext) {
this.#write(Ansi.color.lightBlue)
this.#ansi(Ansi.color.lightBlue)
this.#write('elapsed')
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#write(`: ${context.elapsed.toFixed(0)} ms`)
this.#newline()

this.#write(Ansi.color.lightBlue)
this.#ansi(Ansi.color.lightBlue)
this.#write('passed')
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#write(`: ${context.passCount}`)
this.#newline()

this.#write(Ansi.color.lightBlue)
this.#ansi(Ansi.color.lightBlue)
this.#write('failed')
this.#write(Ansi.reset)
this.#ansi(Ansi.reset)
this.#write(`: ${context.failCount}`)
this.#newline()
}
Expand Down

0 comments on commit 7f56cfb

Please sign in to comment.