Skip to content

Commit

Permalink
chore(streams): rtsp-parser Buffer -> Uint8Array
Browse files Browse the repository at this point in the history
  • Loading branch information
steabert committed Dec 5, 2024
1 parent 998d5e2 commit 92f6b03
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 78 deletions.
2 changes: 1 addition & 1 deletion streams/src/components/auth/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class Auth extends Tube {
) {
if (
msg.type === MessageType.RTSP &&
statusCode(msg.data) === UNAUTHORIZED
statusCode(msg.data.toString()) === UNAUTHORIZED
) {
const headers = msg.data.toString().split('\n')
const wwwAuth = headers.find((header) => /WWW-Auth/i.test(header))
Expand Down
22 changes: 14 additions & 8 deletions streams/src/components/rtsp-parser/parser.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { concat } from 'utils/bytes'
import { rtcpMessageFromBuffer } from '../../utils/protocols/rtcp'
import { bodyOffset, extractHeaderValue } from '../../utils/protocols/rtsp'
import { messageFromBuffer } from '../../utils/protocols/sdp'
Expand Down Expand Up @@ -96,18 +97,22 @@ export class Parser {
_parseRtsp(): Array<RtspMessage | SdpMessage> {
const messages: Array<RtspMessage | SdpMessage> = []

const buffer = Buffer.concat(this._chunks)
const chunkBodyOffset = bodyOffset(buffer)
const data = concat(this._chunks)
const chunkBodyOffset = bodyOffset(data)
// If last added chunk does not have the end of the header, return.
if (chunkBodyOffset === -1) {
return messages
}

const rtspHeaderLength = chunkBodyOffset
const contentLength = extractHeaderValue(buffer, 'Content-Length')

const dec = new TextDecoder()
const header = dec.decode(data.subarray(0, rtspHeaderLength))

const contentLength = extractHeaderValue(header, 'Content-Length')
if (
contentLength &&
parseInt(contentLength) > buffer.length - rtspHeaderLength
parseInt(contentLength) > data.length - rtspHeaderLength
) {
// we do not have the whole body
return messages
Expand All @@ -116,10 +121,11 @@ export class Parser {
this._init() // resets this._chunks and this._length

if (
rtspHeaderLength === buffer.length ||
buffer[rtspHeaderLength] === ASCII_DOLLAR
rtspHeaderLength === data.length ||
data[rtspHeaderLength] === ASCII_DOLLAR
) {
// No body in this chunk, assume there is no body?
const buffer = Buffer.from(data.buffer)
const packet = buffer.slice(0, rtspHeaderLength)
messages.push({ type: MessageType.RTSP, data: packet })

Expand All @@ -128,8 +134,8 @@ export class Parser {
this._push(trailing)
} else {
// Body is assumed to be the remaining data of the last chunk.
const packet = buffer
const body = buffer.slice(rtspHeaderLength)
const packet = Buffer.from(data.buffer)
const body = Buffer.from(data.buffer).slice(rtspHeaderLength)

messages.push({ type: MessageType.RTSP, data: packet })
messages.push(messageFromBuffer(body))
Expand Down
23 changes: 13 additions & 10 deletions streams/src/components/rtsp-session/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,12 @@ export class RtspSession extends Tube {
_onRtsp(msg: RtspMessage) {
this._waiting = false

const status = statusCode(msg.data)
const ended = connectionEnded(msg.data)
const seq = sequence(msg.data)
const dec = new TextDecoder()
const content = dec.decode(msg.data)

const status = statusCode(content)
const ended = connectionEnded(content)
const seq = sequence(content)
if (seq === null) {
throw new Error('rtsp: expected sequence number')
}
Expand All @@ -284,11 +287,11 @@ export class RtspSession extends Tube {
}
const method = this._callHistory[seq - 1]

debug('msl:rtsp:incoming')(`${msg.data}`)
debug('msl:rtsp:incoming')(`${content}`)
if (!this._sessionId && !ended) {
// Response on first SETUP
this._sessionId = sessionId(msg.data)
const _sessionTimeout = sessionTimeout(msg.data)
this._sessionId = sessionId(content)
const _sessionTimeout = sessionTimeout(content)
if (_sessionTimeout !== null) {
// The server specified that sessions will timeout if not renewed.
// In order to keep it alive we need periodically send a RTSP_OPTIONS message
Expand All @@ -306,20 +309,20 @@ export class RtspSession extends Tube {
}

if (!this._contentBase) {
this._contentBase = contentBase(msg.data)
this._contentBase = contentBase(content)
}
if (!this._contentLocation) {
this._contentLocation = contentLocation(msg.data)
this._contentLocation = contentLocation(content)
}
if (status >= 400) {
// TODO: Retry in certain cases?
this.onError &&
this.onError(new RTSPResponseError(msg.data.toString('ascii'), status))
this.onError(new RTSPResponseError(content, status))
}

if (method === RTSP_METHOD.PLAY) {
// When starting to play, send the actual range to an external handler.
this.onPlay && this.onPlay(range(msg.data))
this.onPlay && this.onPlay(range(content))
}

if (ended) {
Expand Down
13 changes: 13 additions & 0 deletions streams/src/utils/bytes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export function concat(bufs: Uint8Array[]): Uint8Array {
let byteLength = 0
for (const b of bufs) {
byteLength += b.length
}
const buf = new Uint8Array(byteLength)
let offset = 0
for (const b of bufs) {
buf.set(b, offset)
offset += b.length
}
return buf
}
103 changes: 62 additions & 41 deletions streams/src/utils/protocols/rtsp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,56 +38,59 @@
* ....
*/

export const ASCII = {
LF: 10,
CR: 13,
} as const

/**
* Extract the value of a header.
*
* @param buffer The response bytes
* @param header The header to search for
* @param key The header to search for
*/
export const extractHeaderValue = (buffer: Buffer, header: string) => {
const anchor = `\n${header.toLowerCase()}: `
const start = buffer.toString().toLowerCase().indexOf(anchor)
export const extractHeaderValue = (header: string, key: string) => {
const anchor = `\n${key.toLowerCase()}: `
const start = header.toLowerCase().indexOf(anchor)
if (start >= 0) {
const end = buffer.indexOf('\n', start + anchor.length)
const headerValue = buffer
.toString('ascii', start + anchor.length, end)
.trim()
const end = header.indexOf('\n', start + anchor.length)
const headerValue = header.substring(start + anchor.length, end).trim()
return headerValue
}
return null
}

export const sequence = (buffer: Buffer) => {
export const sequence = (header: string) => {
/**
* CSeq = "CSeq" HCOLON cseq-nr
* cseq-nr = 1*9DIGIT
*/
const val = extractHeaderValue(buffer, 'CSeq')
const val = extractHeaderValue(header, 'CSeq')
if (val !== null) {
return Number(val)
}
return null
}

export const sessionId = (buffer: Buffer) => {
export const sessionId = (header: string) => {
/**
* Session = "Session" HCOLON session-id
* [ SEMI "timeout" EQUAL delta-seconds ]
* session-id = 1*256( ALPHA / DIGIT / safe )
* delta-seconds = 1*19DIGIT
*/
const val = extractHeaderValue(buffer, 'Session')
const val = extractHeaderValue(header, 'Session')
return val ? val.split(';')[0] : null
}

export const sessionTimeout = (buffer: Buffer) => {
export const sessionTimeout = (header: string) => {
/**
* Session = "Session" HCOLON session-id
* [ SEMI "timeout" EQUAL delta-seconds ]
* session-id = 1*256( ALPHA / DIGIT / safe )
* delta-seconds = 1*19DIGIT
*/
const val = extractHeaderValue(buffer, 'Session')
const val = extractHeaderValue(header, 'Session')
if (val === null) {
return null
}
Expand All @@ -103,35 +106,35 @@ export const sessionTimeout = (buffer: Buffer) => {
return defaultTimeout
}

export const statusCode = (buffer: Buffer) => {
return Number(buffer.toString('ascii', 9, 12))
export const statusCode = (header: string) => {
return Number(header.substring(9, 12))
}

export const contentBase = (buffer: Buffer) => {
export const contentBase = (header: string) => {
/**
* Content-Base = "Content-Base" HCOLON RTSP-URI
*/
return extractHeaderValue(buffer, 'Content-Base')
return extractHeaderValue(header, 'Content-Base')
}

export const contentLocation = (buffer: Buffer) => {
export const contentLocation = (header: string) => {
/**
* Content-Location = "Content-Location" HCOLON RTSP-REQ-Ref
*/
return extractHeaderValue(buffer, 'Content-Location')
return extractHeaderValue(header, 'Content-Location')
}

export const connectionEnded = (buffer: Buffer) => {
export const connectionEnded = (header: string) => {
/**
* Connection = "Connection" HCOLON connection-token
* *(COMMA connection-token)
* connection-token = "close" / token
*/
const connectionToken = extractHeaderValue(buffer, 'Connection')
const connectionToken = extractHeaderValue(header, 'Connection')
return connectionToken !== null && connectionToken.toLowerCase() === 'close'
}

export const range = (buffer: Buffer) => {
export const range = (header: string) => {
/**
* Range = "Range" HCOLON ranges-spec
* ranges-spec = npt-range / utc-range / smpte-range
Expand All @@ -154,38 +157,56 @@ export const range = (buffer: Buffer) => {
// Example range headers:
// Range: npt=now-
// Range: npt=1154.598701-3610.259146
const npt = extractHeaderValue(buffer, 'Range')
const npt = extractHeaderValue(header, 'Range')
if (npt !== null) {
return npt.split('=')[1].split('-')
}
return undefined
}

interface HeaderTerminator {
byteLength: number
sequence: string
startByte: number
}
const headerTerminators: HeaderTerminator[] = [
// expected
{ sequence: '\r\n\r\n', startByte: ASCII.CR, byteLength: 4 },
// legacy compatibility
{ sequence: '\r\r', startByte: ASCII.CR, byteLength: 2 },
{ sequence: '\n\n', startByte: ASCII.LF, byteLength: 2 },
]

/**
* Determine the offset of the RTSP body, where the header ends.
* If there is no header ending, -1 is returned
* @param chunk - A piece of data
* @return The body offset, or -1 if no header end found
*/
export const bodyOffset = (chunk: Buffer) => {
/**
* Strictly speaking, it seems RTSP MUST have CRLF and doesn't allow CR or LF on its own.
* That means that the end of the header part should be a pair of CRLF, but we're being
* flexible here and also allow LF LF or CR CR instead of CRLF CRLF.
*/
const bodyOffsets = ['\n\n', '\r\r', '\r\n\r\n']
.map((s) => {
const offset = chunk.indexOf(s)
if (offset !== -1) {
return offset + s.length
export const bodyOffset = (chunk: Uint8Array) => {
// Strictly speaking, it seems RTSP MUST have CRLF and doesn't allow CR or LF on its own.
// That means that the end of the header part should be a pair of CRLF, but we're being
// flexible here and also allow LF LF or CR CR instead of CRLF CRLF (should be handled
// according to version 1.0)
const dec = new TextDecoder()

for (const terminator of headerTerminators) {
const terminatorOffset = chunk.findIndex((value, index, array) => {
if (value === terminator.startByte) {
const candidate = dec.decode(
array.slice(index, index + terminator.byteLength)
)

if (candidate === terminator.sequence) {
return true
}
}
return offset
})
.filter((offset) => offset !== -1)
if (bodyOffsets.length > 0) {
return bodyOffsets.reduce((acc, offset) => {
return Math.min(acc, offset)
return false
})
if (terminatorOffset !== -1) {
return terminatorOffset + terminator.byteLength
}
}

return -1
}
Loading

0 comments on commit 92f6b03

Please sign in to comment.