From 7f56cfb617dbacd9a2c8847226dbeed0ff9536c8 Mon Sep 17 00:00:00 2001 From: sinclair Date: Sun, 11 Aug 2024 15:35:36 +0900 Subject: [PATCH] Add Terminate Message to Hub --- src/http/fetch.mts | 5 ++-- src/webrtc/webrtc.mts | 17 ++++++++++---- test/index.mts | 12 +++++++--- test/test/reporter.mts | 53 ++++++++++++++++++++++-------------------- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/src/http/fetch.mts b/src/http/fetch.mts index 728e1ac..48a25d4 100644 --- a/src/http/fetch.mts +++ b/src/http/fetch.mts @@ -28,6 +28,7 @@ THE SOFTWARE. import { HttpListenerRequestInit, HttpListenerResponseInit } from './listener.mjs' +import * as Async from '../async/index.mjs' import * as Buffer from '../buffer/index.mjs' import * as Stream from '../stream/index.mjs' import * as Net from '../net/index.mjs' @@ -120,7 +121,7 @@ export async function fetch(net: Net.NetModule, endpoint: string, requestInit: R // send request body to server sendRequestBody(duplex, requestInit).catch((error) => console.error(error)) // read server response signal - const signal = await readResponseSignal(duplex) + const signal = await Async.timeout(readResponseSignal(duplex), { timeout: 4000, error: new Error('A timeout occured reading the http response signal') }) if (signal === null) { await duplex.close() throw Error(`Connection to ${endpoint} terminated unexpectedly`) @@ -131,7 +132,7 @@ export async function fetch(net: Net.NetModule, endpoint: string, requestInit: R throw Error('Server is using alternate protocol') } // read server response init - const responseInit = await readListenerResponseInit(duplex) + const responseInit = await Async.timeout(readListenerResponseInit(duplex), { timeout: 4000, error: new Error('A timeout occured reading http response init') }) if (responseInit === null) { await duplex.close() throw Error('Unable to parse server response headers') diff --git a/src/webrtc/webrtc.mts b/src/webrtc/webrtc.mts index ebd6e94..279fde1 100644 --- a/src/webrtc/webrtc.mts +++ b/src/webrtc/webrtc.mts @@ -35,7 +35,7 @@ import { WebRtcTrackListener, WebRtcTrackListenerAcceptCallback } from './track/ // ------------------------------------------------------------------ // WebRtcMessage // ------------------------------------------------------------------ -export type WebRtcMessage = WebRtcCandidateMessage | WebRtcDescriptionMessage +export type WebRtcMessage = WebRtcCandidateMessage | WebRtcDescriptionMessage | WebRtcTerminateMessage export type WebRtcCandidateMessage = { type: 'candidate' @@ -45,6 +45,9 @@ export type WebRtcDescriptionMessage = { type: 'description' description: RTCSessionDescription } +export type WebRtcTerminateMessage = { + type: 'terminate' +} // ------------------------------------------------------------------ // WebRtcPeer // ------------------------------------------------------------------ @@ -107,6 +110,11 @@ export class WebRtcModule implements Dispose.Dispose { open.reject(new Error(`Connection to '${remoteAddress}:${port}' timed out`)) }, 4000) return await open.promise() + } + /** Terminates the RTCPeerConnection associated with this remoteAddress and asks the remote peer to do the same */ + public async terminate(remoteAddress: string) { + this.#hub.send({ to: remoteAddress, data: { type: 'terminate' }}) + await this.#terminateConnection(remoteAddress) } // ------------------------------------------------------------------ // Media @@ -170,7 +178,7 @@ export class WebRtcModule implements Dispose.Dispose { lock.dispose() } } - async onHubCandidate(message: Hubs.HubMessage) { + async #onHubCandidate(message: Hubs.HubMessage) { if (message.data.candidate === null) return const peer = await this.#resolvePeer(message.from) try { @@ -184,7 +192,8 @@ export class WebRtcModule implements Dispose.Dispose { const data = message.data as WebRtcMessage switch (data.type) { case 'description': return this.#onHubDescription(message as never) - case 'candidate': return this.onHubCandidate(message as never) + case 'candidate': return this.#onHubCandidate(message as never) + case 'terminate': return this.#terminateConnection(message.from) } } // ------------------------------------------------------------------ @@ -217,7 +226,7 @@ export class WebRtcModule implements Dispose.Dispose { if(!this.#channelListeners.has(port)) return datachannel.close() const listener = this.#channelListeners.get(port)! event.channel.addEventListener('close', () => peer.datachannels.delete(datachannel)) - peer.datachannels.add(datachannel) + peer.datachannels.add(datachannel) listener.accept(peer, datachannel) } #onPeerTrack(peer: WebRtcPeer, event: RTCTrackEvent) { diff --git a/test/index.mts b/test/index.mts index 83e1389..f8e8e2e 100644 --- a/test/index.mts +++ b/test/index.mts @@ -20,6 +20,12 @@ import './os/index.mjs' // Runner // ------------------------------------------------------------------ declare const Drift: any -Test.run({ filter: Drift.args[0] }).then((result) => { - return result.success ? Drift.close(0) : Drift.close(1) -}) +if ('Drift' in globalThis) { + Test.run({ filter: Drift.args[0] }).then((result) => { + return result.success ? Drift.close(0) : Drift.close(1) + }) +} else { + Test.run({ filter: '' }).then((result) => { + console.log('done', result) + }) +} diff --git a/test/test/reporter.mts b/test/test/reporter.mts index 50223b5..8d66426 100644 --- a/test/test/reporter.mts +++ b/test/test/reporter.mts @@ -107,14 +107,17 @@ export class StdoutReporter implements Reporter { #write(message: string) { this.#writer.write(Buffer.encode(message)) } + #ansi(message: string) { + this.#writer.write(Buffer.encode(message)) + } // ---------------------------------------------------------------- // Handlers // ---------------------------------------------------------------- public onContextBegin(context: DescribeContext) { if (context.name === 'root') return - this.#write(Ansi.color.lightBlue) + this.#ansi(Ansi.color.lightBlue) this.#write(context.name) - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#newline() } public onContextEnd(context: DescribeContext) { @@ -122,51 +125,51 @@ export class StdoutReporter implements Reporter { this.#newline() } public onUnitBegin(unit: ItContext) { - this.#write(Ansi.color.gray) + this.#ansi(Ansi.color.gray) this.#write(` - ${unit.name}`) - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) } public onUnitEnd(unit: ItContext) { if (unit.error === null) { - this.#write(Ansi.color.green) + this.#ansi(Ansi.color.green) this.#write(` pass`) - this.#write(Ansi.reset) - this.#write(Ansi.color.lightBlue) + this.#ansi(Ansi.reset) + this.#ansi(Ansi.color.lightBlue) this.#write(` ${unit.elapsed.toFixed()} ms`) - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) } else { - this.#write(Ansi.color.lightRed) + this.#ansi(Ansi.color.lightRed) this.#write(' fail') - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) } this.#newline() } #printFailureSummary(context: DescribeContext) { for (const error of context.failures()) { - this.#write(Ansi.color.lightBlue) + this.#ansi(Ansi.color.lightBlue) this.#write(`${error.context} `) - this.#write(Ansi.reset) - this.#write(Ansi.color.gray) + this.#ansi(Ansi.reset) + this.#ansi(Ansi.color.gray) this.#write(`${error.unit}`) - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#newline() this.#newline() this.#write(Ansi.color.lightRed) this.#write(` error`) - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#write(`: ${error.error.message}`) this.#newline() if (error.error instanceof Assert.AssertError) { - this.#write(Ansi.color.lightGreen) + this.#ansi(Ansi.color.lightGreen) this.#write(` expect`) - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#write(`: ${ValueFormatter.format(error.error.expect)}`) this.#newline() - this.#write(Ansi.color.lightRed) + this.#ansi(Ansi.color.lightRed) this.#write(` actual`) - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#write(`: ${ValueFormatter.format(error.error.actual)}`) this.#newline() } @@ -175,21 +178,21 @@ export class StdoutReporter implements Reporter { this.#newline() } #printCompletionSummary(context: DescribeContext) { - this.#write(Ansi.color.lightBlue) + this.#ansi(Ansi.color.lightBlue) this.#write('elapsed') - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#write(`: ${context.elapsed.toFixed(0)} ms`) this.#newline() - this.#write(Ansi.color.lightBlue) + this.#ansi(Ansi.color.lightBlue) this.#write('passed') - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#write(`: ${context.passCount}`) this.#newline() - this.#write(Ansi.color.lightBlue) + this.#ansi(Ansi.color.lightBlue) this.#write('failed') - this.#write(Ansi.reset) + this.#ansi(Ansi.reset) this.#write(`: ${context.failCount}`) this.#newline() }