Skip to content

Commit 2628cde

Browse files
committed
fix: add close method to HTTP source
Allow to terminate HTTP source by aborting the fetch, and closing the response stream. Properly catch any errors and cancel the stream before aborting the fetch.
1 parent b376855 commit 2628cde

File tree

2 files changed

+80
-38
lines changed

2 files changed

+80
-38
lines changed

lib/components/http-source/index.ts

+58-38
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,27 @@ import { MessageType } from '../message'
55

66
const debug = registerDebug('msl:http-source')
77

8-
interface Headers {
9-
[key: string]: string
10-
}
11-
128
export interface HttpConfig {
139
uri: string
14-
headers?: Headers
10+
options?: RequestInit
1511
}
1612

1713
export class HttpSource extends Source {
1814
public uri: string
19-
public headers?: Headers
15+
public options?: RequestInit
2016
public length?: number
2117

2218
private _reader?: ReadableStreamDefaultReader<Uint8Array>
19+
private _abortController?: AbortController
20+
private _allDone: boolean
2321

2422
/**
2523
* Create an HTTP component.
2624
*
2725
* The constructor sets a single readable stream from a fetch.
2826
*/
2927
constructor(config: HttpConfig) {
30-
const { uri, headers } = config
31-
28+
const { uri, options } = config
3229
/**
3330
* Set up an incoming stream and attach it to the socket.
3431
*/
@@ -56,16 +53,23 @@ export class HttpSource extends Source {
5653
}
5754

5855
this.uri = uri
59-
this.headers = headers
56+
this.options = options
57+
this._allDone = false
6058
}
6159

6260
play(): void {
6361
if (this.uri === undefined) {
6462
throw new Error('cannot start playing when there is no URI')
6563
}
6664

65+
this._abortController = new AbortController()
66+
6767
this.length = 0
68-
fetch(this.uri, { headers: this.headers })
68+
fetch(this.uri, {
69+
credentials: 'include',
70+
signal: this._abortController.signal,
71+
...this.options,
72+
})
6973
.then((rsp) => {
7074
if (rsp.body === null) {
7175
throw new Error('empty response body')
@@ -75,41 +79,57 @@ export class HttpSource extends Source {
7579
this._pull()
7680
})
7781
.catch((err) => {
78-
throw new Error(err)
82+
console.error('http-source: fetch failed: ', err)
7983
})
8084
}
8185

86+
abort(): void {
87+
this._reader &&
88+
this._reader.cancel().catch((err) => {
89+
console.log('http-source: cancel reader failed: ', err)
90+
})
91+
this._abortController && this._abortController.abort()
92+
}
93+
8294
_pull(): void {
8395
if (this._reader === undefined) {
8496
return
8597
}
8698

87-
this._reader.read().then(({ done, value }) => {
88-
if (done) {
89-
debug('fetch completed, total downloaded: ', this.length, ' bytes')
90-
this.incoming.push(null)
91-
return
92-
}
93-
if (value === undefined) {
94-
throw new Error('expected value to be defined')
95-
}
96-
if (this.length === undefined) {
97-
throw new Error('expected length to be defined')
98-
}
99-
this.length += value.length
100-
const buffer = Buffer.from(value)
101-
if (!this.incoming.push({ data: buffer, type: MessageType.RAW })) {
102-
// Something happened down stream that it is no longer processing the
103-
// incoming data, and the stream buffer got full.
104-
// This could be because we are downloading too much data at once,
105-
// or because the downstream is frozen. The latter is most likely
106-
// when dealing with a live stream (as in that case we would expect
107-
// downstream to be able to handle the data).
108-
debug('downstream back pressure: pausing read')
109-
} else {
110-
// It's ok to read more data
111-
this._pull()
112-
}
113-
})
99+
this._reader
100+
.read()
101+
.then(({ done, value }) => {
102+
if (done) {
103+
if (!this._allDone) {
104+
debug('fetch completed, total downloaded: ', this.length, ' bytes')
105+
this.incoming.push(null)
106+
}
107+
this._allDone = true
108+
return
109+
}
110+
if (value === undefined) {
111+
throw new Error('expected value to be defined')
112+
}
113+
if (this.length === undefined) {
114+
throw new Error('expected length to be defined')
115+
}
116+
this.length += value.length
117+
const buffer = Buffer.from(value)
118+
if (!this.incoming.push({ data: buffer, type: MessageType.RAW })) {
119+
// Something happened down stream that it is no longer processing the
120+
// incoming data, and the stream buffer got full.
121+
// This could be because we are downloading too much data at once,
122+
// or because the downstream is frozen. The latter is most likely
123+
// when dealing with a live stream (as in that case we would expect
124+
// downstream to be able to handle the data).
125+
debug('downstream back pressure: pausing read')
126+
} else {
127+
// It's ok to read more data
128+
this._pull()
129+
}
130+
})
131+
.catch((err) => {
132+
console.error('http-source: read failed: ', err)
133+
})
114134
}
115135
}

lib/pipelines/http-mse-pipeline.ts

+22
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ export interface HttpMseConfig {
1111
export class HttpMsePipeline extends Pipeline {
1212
public http: HttpSource
1313

14+
private _src?: HttpSource
15+
private _sink: MseSink
16+
1417
constructor(config: HttpMseConfig) {
1518
const { http: httpConfig, mediaElement } = config
1619

@@ -20,7 +23,26 @@ export class HttpMsePipeline extends Pipeline {
2023

2124
super(httpSource, mp4Parser, mseSink)
2225

26+
this._src = httpSource
27+
this._sink = mseSink
28+
2329
// Expose session for external use
2430
this.http = httpSource
2531
}
32+
33+
close() {
34+
this._src && this._src.abort()
35+
}
36+
37+
get currentTime() {
38+
return this._sink.currentTime
39+
}
40+
41+
play() {
42+
return this._sink.play()
43+
}
44+
45+
pause() {
46+
return this._sink.pause()
47+
}
2648
}

0 commit comments

Comments
 (0)