diff --git a/streams/src/components/auth/index.ts b/streams/src/components/auth/index.ts index 08efa0888..666d9f188 100644 --- a/streams/src/components/auth/index.ts +++ b/streams/src/components/auth/index.ts @@ -56,7 +56,7 @@ export class Auth extends Tube { ) { if ( msg.type === MessageType.RTSP && - statusCode(msg.data) === UNAUTHORIZED + statusCode(msg.data.toString()) === UNAUTHORIZED ) { const headers = msg.data.toString().split('\n') const wwwAuth = headers.find((header) => /WWW-Auth/i.test(header)) diff --git a/streams/src/components/rtsp-parser/parser.ts b/streams/src/components/rtsp-parser/parser.ts index 54816b64a..5cad06f39 100644 --- a/streams/src/components/rtsp-parser/parser.ts +++ b/streams/src/components/rtsp-parser/parser.ts @@ -1,3 +1,4 @@ +import { concat } from 'utils/bytes' import { rtcpMessageFromBuffer } from '../../utils/protocols/rtcp' import { bodyOffset, extractHeaderValue } from '../../utils/protocols/rtsp' import { messageFromBuffer } from '../../utils/protocols/sdp' @@ -96,18 +97,22 @@ export class Parser { _parseRtsp(): Array { const messages: Array = [] - const buffer = Buffer.concat(this._chunks) - const chunkBodyOffset = bodyOffset(buffer) + const data = concat(this._chunks) + const chunkBodyOffset = bodyOffset(data) // If last added chunk does not have the end of the header, return. if (chunkBodyOffset === -1) { return messages } const rtspHeaderLength = chunkBodyOffset - const contentLength = extractHeaderValue(buffer, 'Content-Length') + + const dec = new TextDecoder() + const header = dec.decode(data.subarray(0, rtspHeaderLength)) + + const contentLength = extractHeaderValue(header, 'Content-Length') if ( contentLength && - parseInt(contentLength) > buffer.length - rtspHeaderLength + parseInt(contentLength) > data.length - rtspHeaderLength ) { // we do not have the whole body return messages @@ -116,10 +121,11 @@ export class Parser { this._init() // resets this._chunks and this._length if ( - rtspHeaderLength === buffer.length || - buffer[rtspHeaderLength] === ASCII_DOLLAR + rtspHeaderLength === data.length || + data[rtspHeaderLength] === ASCII_DOLLAR ) { // No body in this chunk, assume there is no body? + const buffer = Buffer.from(data.buffer) const packet = buffer.slice(0, rtspHeaderLength) messages.push({ type: MessageType.RTSP, data: packet }) @@ -128,8 +134,8 @@ export class Parser { this._push(trailing) } else { // Body is assumed to be the remaining data of the last chunk. - const packet = buffer - const body = buffer.slice(rtspHeaderLength) + const packet = Buffer.from(data.buffer) + const body = Buffer.from(data.buffer).slice(rtspHeaderLength) messages.push({ type: MessageType.RTSP, data: packet }) messages.push(messageFromBuffer(body)) diff --git a/streams/src/components/rtsp-session/index.ts b/streams/src/components/rtsp-session/index.ts index 4946e16e4..f209ace58 100644 --- a/streams/src/components/rtsp-session/index.ts +++ b/streams/src/components/rtsp-session/index.ts @@ -273,9 +273,12 @@ export class RtspSession extends Tube { _onRtsp(msg: RtspMessage) { this._waiting = false - const status = statusCode(msg.data) - const ended = connectionEnded(msg.data) - const seq = sequence(msg.data) + const dec = new TextDecoder() + const content = dec.decode(msg.data) + + const status = statusCode(content) + const ended = connectionEnded(content) + const seq = sequence(content) if (seq === null) { throw new Error('rtsp: expected sequence number') } @@ -284,11 +287,11 @@ export class RtspSession extends Tube { } const method = this._callHistory[seq - 1] - debug('msl:rtsp:incoming')(`${msg.data}`) + debug('msl:rtsp:incoming')(`${content}`) if (!this._sessionId && !ended) { // Response on first SETUP - this._sessionId = sessionId(msg.data) - const _sessionTimeout = sessionTimeout(msg.data) + this._sessionId = sessionId(content) + const _sessionTimeout = sessionTimeout(content) if (_sessionTimeout !== null) { // The server specified that sessions will timeout if not renewed. // In order to keep it alive we need periodically send a RTSP_OPTIONS message @@ -306,20 +309,20 @@ export class RtspSession extends Tube { } if (!this._contentBase) { - this._contentBase = contentBase(msg.data) + this._contentBase = contentBase(content) } if (!this._contentLocation) { - this._contentLocation = contentLocation(msg.data) + this._contentLocation = contentLocation(content) } if (status >= 400) { // TODO: Retry in certain cases? this.onError && - this.onError(new RTSPResponseError(msg.data.toString('ascii'), status)) + this.onError(new RTSPResponseError(content, status)) } if (method === RTSP_METHOD.PLAY) { // When starting to play, send the actual range to an external handler. - this.onPlay && this.onPlay(range(msg.data)) + this.onPlay && this.onPlay(range(content)) } if (ended) { diff --git a/streams/src/utils/bytes.ts b/streams/src/utils/bytes.ts new file mode 100644 index 000000000..b9531a9eb --- /dev/null +++ b/streams/src/utils/bytes.ts @@ -0,0 +1,13 @@ +export function concat(bufs: Uint8Array[]): Uint8Array { + let byteLength = 0 + for (const b of bufs) { + byteLength += b.length + } + const buf = new Uint8Array(byteLength) + let offset = 0 + for (const b of bufs) { + buf.set(b, offset) + offset += b.length + } + return buf +} diff --git a/streams/src/utils/protocols/rtsp.ts b/streams/src/utils/protocols/rtsp.ts index 7610966ba..231dd2249 100644 --- a/streams/src/utils/protocols/rtsp.ts +++ b/streams/src/utils/protocols/rtsp.ts @@ -38,56 +38,59 @@ * .... */ +export const ASCII = { + LF: 10, + CR: 13, +} as const + /** * Extract the value of a header. * * @param buffer The response bytes - * @param header The header to search for + * @param key The header to search for */ -export const extractHeaderValue = (buffer: Buffer, header: string) => { - const anchor = `\n${header.toLowerCase()}: ` - const start = buffer.toString().toLowerCase().indexOf(anchor) +export const extractHeaderValue = (header: string, key: string) => { + const anchor = `\n${key.toLowerCase()}: ` + const start = header.toLowerCase().indexOf(anchor) if (start >= 0) { - const end = buffer.indexOf('\n', start + anchor.length) - const headerValue = buffer - .toString('ascii', start + anchor.length, end) - .trim() + const end = header.indexOf('\n', start + anchor.length) + const headerValue = header.substring(start + anchor.length, end).trim() return headerValue } return null } -export const sequence = (buffer: Buffer) => { +export const sequence = (header: string) => { /** * CSeq = "CSeq" HCOLON cseq-nr * cseq-nr = 1*9DIGIT */ - const val = extractHeaderValue(buffer, 'CSeq') + const val = extractHeaderValue(header, 'CSeq') if (val !== null) { return Number(val) } return null } -export const sessionId = (buffer: Buffer) => { +export const sessionId = (header: string) => { /** * Session = "Session" HCOLON session-id * [ SEMI "timeout" EQUAL delta-seconds ] * session-id = 1*256( ALPHA / DIGIT / safe ) * delta-seconds = 1*19DIGIT */ - const val = extractHeaderValue(buffer, 'Session') + const val = extractHeaderValue(header, 'Session') return val ? val.split(';')[0] : null } -export const sessionTimeout = (buffer: Buffer) => { +export const sessionTimeout = (header: string) => { /** * Session = "Session" HCOLON session-id * [ SEMI "timeout" EQUAL delta-seconds ] * session-id = 1*256( ALPHA / DIGIT / safe ) * delta-seconds = 1*19DIGIT */ - const val = extractHeaderValue(buffer, 'Session') + const val = extractHeaderValue(header, 'Session') if (val === null) { return null } @@ -103,35 +106,35 @@ export const sessionTimeout = (buffer: Buffer) => { return defaultTimeout } -export const statusCode = (buffer: Buffer) => { - return Number(buffer.toString('ascii', 9, 12)) +export const statusCode = (header: string) => { + return Number(header.substring(9, 12)) } -export const contentBase = (buffer: Buffer) => { +export const contentBase = (header: string) => { /** * Content-Base = "Content-Base" HCOLON RTSP-URI */ - return extractHeaderValue(buffer, 'Content-Base') + return extractHeaderValue(header, 'Content-Base') } -export const contentLocation = (buffer: Buffer) => { +export const contentLocation = (header: string) => { /** * Content-Location = "Content-Location" HCOLON RTSP-REQ-Ref */ - return extractHeaderValue(buffer, 'Content-Location') + return extractHeaderValue(header, 'Content-Location') } -export const connectionEnded = (buffer: Buffer) => { +export const connectionEnded = (header: string) => { /** * Connection = "Connection" HCOLON connection-token * *(COMMA connection-token) * connection-token = "close" / token */ - const connectionToken = extractHeaderValue(buffer, 'Connection') + const connectionToken = extractHeaderValue(header, 'Connection') return connectionToken !== null && connectionToken.toLowerCase() === 'close' } -export const range = (buffer: Buffer) => { +export const range = (header: string) => { /** * Range = "Range" HCOLON ranges-spec * ranges-spec = npt-range / utc-range / smpte-range @@ -154,38 +157,56 @@ export const range = (buffer: Buffer) => { // Example range headers: // Range: npt=now- // Range: npt=1154.598701-3610.259146 - const npt = extractHeaderValue(buffer, 'Range') + const npt = extractHeaderValue(header, 'Range') if (npt !== null) { return npt.split('=')[1].split('-') } return undefined } +interface HeaderTerminator { + byteLength: number + sequence: string + startByte: number +} +const headerTerminators: HeaderTerminator[] = [ + // expected + { sequence: '\r\n\r\n', startByte: ASCII.CR, byteLength: 4 }, + // legacy compatibility + { sequence: '\r\r', startByte: ASCII.CR, byteLength: 2 }, + { sequence: '\n\n', startByte: ASCII.LF, byteLength: 2 }, +] + /** * Determine the offset of the RTSP body, where the header ends. * If there is no header ending, -1 is returned * @param chunk - A piece of data * @return The body offset, or -1 if no header end found */ -export const bodyOffset = (chunk: Buffer) => { - /** - * Strictly speaking, it seems RTSP MUST have CRLF and doesn't allow CR or LF on its own. - * That means that the end of the header part should be a pair of CRLF, but we're being - * flexible here and also allow LF LF or CR CR instead of CRLF CRLF. - */ - const bodyOffsets = ['\n\n', '\r\r', '\r\n\r\n'] - .map((s) => { - const offset = chunk.indexOf(s) - if (offset !== -1) { - return offset + s.length +export const bodyOffset = (chunk: Uint8Array) => { + // Strictly speaking, it seems RTSP MUST have CRLF and doesn't allow CR or LF on its own. + // That means that the end of the header part should be a pair of CRLF, but we're being + // flexible here and also allow LF LF or CR CR instead of CRLF CRLF (should be handled + // according to version 1.0) + const dec = new TextDecoder() + + for (const terminator of headerTerminators) { + const terminatorOffset = chunk.findIndex((value, index, array) => { + if (value === terminator.startByte) { + const candidate = dec.decode( + array.slice(index, index + terminator.byteLength) + ) + + if (candidate === terminator.sequence) { + return true + } } - return offset - }) - .filter((offset) => offset !== -1) - if (bodyOffsets.length > 0) { - return bodyOffsets.reduce((acc, offset) => { - return Math.min(acc, offset) + return false }) + if (terminatorOffset !== -1) { + return terminatorOffset + terminator.byteLength + } } + return -1 } diff --git a/streams/tests/protocols-rtsp.test.ts b/streams/tests/protocols-rtsp.test.ts index f47e984ae..acc6073e5 100644 --- a/streams/tests/protocols-rtsp.test.ts +++ b/streams/tests/protocols-rtsp.test.ts @@ -22,65 +22,62 @@ import { describe } from './uvu-describe' describe('sequence', (test) => { test('should return an int', () => { - assert.is(sequence(Buffer.from(sdpResponse)), 3) - assert.is(sequence(Buffer.from(setupResponse)), 5) - assert.is(sequence(Buffer.from(optionsResponseLowerCase)), 1) + assert.is(sequence(sdpResponse), 3) + assert.is(sequence(setupResponse), 5) + assert.is(sequence(optionsResponseLowerCase), 1) }) }) describe('sessionId', (test) => { test('should be a null before SETUP', () => { - assert.is(sessionId(Buffer.from(sdpResponse)), null) + assert.is(sessionId(sdpResponse), null) }) test('should be present in a SETUP response', () => { - assert.is(sessionId(Buffer.from(setupResponse)), 'Bk48Ak7wjcWaAgRD') + assert.is(sessionId(setupResponse), 'Bk48Ak7wjcWaAgRD') }) test('should be present in a TEARDOWN response', () => { - assert.is(sessionId(Buffer.from(teardownResponse)), 'ZyHdf8Mn.$epq_8Z') + assert.is(sessionId(teardownResponse), 'ZyHdf8Mn.$epq_8Z') }) }) describe('sessionTimeout', (test) => { test('should be null before SETUP', () => { - assert.is(sessionTimeout(Buffer.from(sdpResponse)), null) + assert.is(sessionTimeout(sdpResponse), null) }) test('should be extracted correctly when in a SETUP response', () => { - assert.is(sessionTimeout(Buffer.from(setupResponse)), 120) + assert.is(sessionTimeout(setupResponse), 120) }) test('should be 60 when not specified in a SETUP response', () => { - assert.is(sessionTimeout(Buffer.from(setupResponseNoTimeout)), 60) + assert.is(sessionTimeout(setupResponseNoTimeout), 60) }) }) describe('statusCode', (test) => { test('should return an integer', () => { - assert.is(statusCode(Buffer.from(sdpResponseLive555)), 200) - assert.is(statusCode(Buffer.from(teardownResponse)), 200) + assert.is(statusCode(sdpResponseLive555), 200) + assert.is(statusCode(teardownResponse), 200) }) }) describe('contentBase', (test) => { test('should return correct contentBase', () => { assert.is( - contentBase(Buffer.from(sdpResponse)), + contentBase(sdpResponse), 'rtsp://192.168.0.3/axis-media/media.amp/' ) }) test('should return correct contentBase using live555', () => { - assert.is( - contentBase(Buffer.from(sdpResponseLive555)), - 'rtsp://127.0.0.1:8554/out.svg/' - ) + assert.is(contentBase(sdpResponseLive555), 'rtsp://127.0.0.1:8554/out.svg/') }) }) describe('connectionEnded', (test) => { test('should be true in a TEARDOWN response', () => { - assert.is(connectionEnded(Buffer.from(teardownResponse)), true) + assert.is(connectionEnded(teardownResponse), true) }) test('should be false otherwise', () => { - assert.is(connectionEnded(Buffer.from(setupResponse)), false) + assert.is(connectionEnded(setupResponse), false) }) })