Skip to content

Commit

Permalink
fixup! chore(streams): rtsp-parser Buffer -> UInt8Array
Browse files Browse the repository at this point in the history
  • Loading branch information
steabert committed Dec 12, 2024
1 parent 8c4c788 commit a2a99eb
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 95 deletions.
5 changes: 3 additions & 2 deletions streams/src/components/rtsp-parser/builder.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import debug from 'debug'

import { encode } from 'utils/bytes'
import { RtspMessage } from '../message'

const DEFAULT_PROTOCOL = 'RTSP/1.0'

export const builder = (msg: RtspMessage): Buffer => {
export const builder = (msg: RtspMessage): Uint8Array => {
if (!msg.method || !msg.uri) {
throw new Error('message needs to contain a method and a uri')
}
Expand All @@ -20,5 +21,5 @@ export const builder = (msg: RtspMessage): Buffer => {
].join('\r\n')
debug('msl:rtsp:outgoing')(messageString)

return Buffer.from(messageString)
return encode(messageString)
}
7 changes: 3 additions & 4 deletions streams/src/components/rtsp-parser/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,17 @@ export class Parser {
data[rtspHeaderLength] === ASCII_DOLLAR
) {
// No body in this chunk, assume there is no body?
const buffer = Buffer.from(data.buffer)
const packet = buffer.slice(0, rtspHeaderLength)
const packet = data.subarray(0, rtspHeaderLength)
messages.push({ type: MessageType.RTSP, data: packet })

// Add the remaining data to the chunk stack.
const trailing = buffer.slice(rtspHeaderLength)
const trailing = data.subarray(rtspHeaderLength)
this._push(trailing)
} else {
// Body is assumed to be the remaining data of the last chunk.
const body = data.subarray(rtspHeaderLength)

messages.push({ type: MessageType.RTSP, data: Buffer.from(data.buffer) })
messages.push({ type: MessageType.RTSP, data })
messages.push(sdpFromBody(decode(body)))
}

Expand Down
26 changes: 24 additions & 2 deletions streams/src/utils/bytes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,17 @@ export function concat(bufs: Uint8Array[]): Uint8Array {
}

// Decode UTF-8 bytes
const DECODER = new TextDecoder()
export function decode(bytes: Uint8Array): string {
const dec = new TextDecoder()
return dec.decode(bytes)
// don't use streams: true option since we use a global decoder
return DECODER.decode(bytes)
}

// Encode string to UTF-8 bytes
const ENCODER = new TextEncoder()
export function encode(text: string): Uint8Array {
// don't use streams: true option since we use a global encoder
return ENCODER.encode(text)
}

// Extract 8-bit big-endian value at byte offset
Expand All @@ -37,6 +45,20 @@ export function readUInt16BE(bytes: Uint8Array, byteOffset: number): number {
).getUint16(byteOffset)
}

// Extract 24-bit big-endian value at byte offset
export function readUInt24BE(bytes: Uint8Array, byteOffset: number): number {
const dataView = new DataView(
bytes.buffer,
bytes.byteOffset,
bytes.byteLength
)
return (
(dataView.getUint8(byteOffset) << 16) +
(dataView.getUint8(byteOffset + 1) << 8) +
dataView.getUint8(byteOffset + 2)
)
}

// Extract 32-bit big-endian value at byte offset
export function readUInt32BE(bytes: Uint8Array, byteOffset: number): number {
return new DataView(
Expand Down
98 changes: 51 additions & 47 deletions streams/src/utils/protocols/rtcp.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import {
readUInt8,
readUInt16BE,
readUInt24BE,
readUInt32BE,
decode,
} from 'utils/bytes'
import { MessageType, RtcpMessage } from '../../components/message'
import { POS } from '../bits'

Expand Down Expand Up @@ -29,16 +36,16 @@ export interface Rtcp {
readonly length: number
}

const parseBase = (buffer: Buffer): Rtcp => ({
const parseBase = (buffer: Uint8Array): Rtcp => ({
version: buffer[0] >>> 6,
padding: !!(buffer[0] & POS[2]),
count: buffer[0] & 0x1f,
packetType: buffer.readUInt8(1),
length: buffer.readUInt16BE(2),
packetType: readUInt8(buffer, 1),
length: readUInt16BE(buffer, 2),
})

export const parseRtcp = (
buffer: Buffer
buffer: Uint8Array
): Rtcp | RtcpSR | RtcpRR | RtcpSDES | RtcpBye | RtcpApp => {
const base = parseBase(buffer)

Expand All @@ -60,11 +67,12 @@ export const parseRtcp = (

export const rtcpMessageFromBuffer = (
channel: number,
buffer: Buffer
buffer: Uint8Array
): RtcpMessage => {
return {
type: MessageType.RTCP,
data: buffer,
// FIXME: update after GenericMessage uses Uint8Array for data
data: Buffer.from(buffer),
channel,
rtcp: parseRtcp(buffer),
}
Expand Down Expand Up @@ -126,20 +134,20 @@ export interface RtcpReportBlock {

const parseReportBlocks = (
count: number,
buffer: Buffer,
buffer: Uint8Array,
offset: number
): RtcpReportBlock[] => {
const reports: RtcpReportBlock[] = []
for (let reportNumber = 0; reportNumber < count; reportNumber++) {
const o = offset + reportNumber * 24
reports.push({
syncSource: buffer.readUInt32BE(o + 0),
fractionLost: buffer.readUInt8(o + 4),
cumulativeNumberOfPacketsLost: buffer.readUIntBE(o + 5, 3),
extendedHighestSequenceNumberReceived: buffer.readUInt32BE(o + 8),
interarrivalJitter: buffer.readUInt32BE(o + 12),
lastSRTimestamp: buffer.readUInt32BE(o + 16),
delaySinceLastSR: buffer.readUInt32BE(o + 20),
syncSource: readUInt32BE(buffer, o + 0),
fractionLost: readUInt8(buffer, o + 4),
cumulativeNumberOfPacketsLost: readUInt24BE(buffer, o + 5),
extendedHighestSequenceNumberReceived: readUInt32BE(buffer, o + 8),
interarrivalJitter: readUInt32BE(buffer, o + 12),
lastSRTimestamp: readUInt32BE(buffer, o + 16),
delaySinceLastSR: readUInt32BE(buffer, o + 20),
})
}
return reports
Expand All @@ -157,14 +165,14 @@ export interface RtcpSR extends Rtcp {
readonly reports: readonly RtcpReportBlock[]
}

const parseSR = (buffer: Buffer, base: Rtcp): RtcpSR => ({
const parseSR = (buffer: Uint8Array, base: Rtcp): RtcpSR => ({
...base,
syncSource: buffer.readUInt32BE(4),
ntpMost: buffer.readUInt32BE(8),
ntpLeast: buffer.readUInt32BE(12),
rtpTimestamp: buffer.readUInt32BE(16),
sendersPacketCount: buffer.readUInt32BE(20),
sendersOctetCount: buffer.readUInt32BE(24),
syncSource: readUInt32BE(buffer, 4),
ntpMost: readUInt32BE(buffer, 8),
ntpLeast: readUInt32BE(buffer, 12),
rtpTimestamp: readUInt32BE(buffer, 16),
sendersPacketCount: readUInt32BE(buffer, 20),
sendersOctetCount: readUInt32BE(buffer, 24),
reports: parseReportBlocks(base.count, buffer, 28),
})

Expand Down Expand Up @@ -208,9 +216,9 @@ export interface RtcpRR extends Rtcp {
readonly reports: readonly RtcpReportBlock[]
}

const parseRR = (buffer: Buffer, base: Rtcp): RtcpRR => ({
const parseRR = (buffer: Uint8Array, base: Rtcp): RtcpRR => ({
...base,
syncSource: buffer.readUInt32BE(4),
syncSource: readUInt32BE(buffer, 4),
reports: parseReportBlocks(base.count, buffer, 8),
})

Expand Down Expand Up @@ -260,18 +268,18 @@ export interface RtcpSDES extends Rtcp {
readonly sourceDescriptions: readonly RtcpSDESBlock[]
}

const parseSDES = (buffer: Buffer, base: Rtcp): RtcpSDES => {
const parseSDES = (buffer: Uint8Array, base: Rtcp): RtcpSDES => {
const sourceDescriptions: RtcpSDESBlock[] = []
let offset = 4
for (let block = 0; block < base.count; block++) {
const chunk: RtcpSDESBlock = {
source: buffer.readUInt32BE(offset),
source: readUInt32BE(buffer, offset),
items: [],
}
offset += 4

while (true) {
const itemType = buffer.readUInt8(offset++)
const itemType = readUInt8(buffer, offset++)

if (itemType === 0) {
// start next block at word boundary
Expand All @@ -281,23 +289,19 @@ const parseSDES = (buffer: Buffer, base: Rtcp): RtcpSDES => {
break
}

const length = buffer.readUInt8(offset++)
const length = readUInt8(buffer, offset++)

if (itemType === SDESItem.PRIV) {
const prefixLength = buffer.readUInt8(offset)
const prefix = buffer.toString(
'utf8',
offset + 1,
offset + 1 + prefixLength
const prefixLength = readUInt8(buffer, offset)
const prefix = decode(
buffer.subarray(offset + 1, offset + 1 + prefixLength)
)
const value = buffer.toString(
'utf8',
offset + 1 + prefixLength,
offset + length
const value = decode(
buffer.subarray(offset + 1 + prefixLength, offset + length)
)
chunk.items.push([SDESItem.PRIV, prefix, value])
} else {
const value = buffer.toString('utf8', offset, offset + length)
const value = decode(buffer.subarray(offset, offset + length))
chunk.items.push([itemType, value])
}

Expand All @@ -308,7 +312,7 @@ const parseSDES = (buffer: Buffer, base: Rtcp): RtcpSDES => {

return {
...base,
syncSource: buffer.readUInt32BE(4),
syncSource: readUInt32BE(buffer, 4),
sourceDescriptions,
}
}
Expand Down Expand Up @@ -339,17 +343,17 @@ export interface RtcpBye extends Rtcp {
readonly reason?: string
}

const parseBYE = (buffer: Buffer, base: Rtcp): RtcpBye => {
const parseBYE = (buffer: Uint8Array, base: Rtcp): RtcpBye => {
const sources: number[] = []
for (let block = 0; block < base.count; block++) {
sources.push(buffer.readUInt32BE(4 + 4 * block))
sources.push(readUInt32BE(buffer, 4 + 4 * block))
}

let reason
if (base.length > base.count) {
const start = 4 + 4 * base.count
const length = buffer.readUInt8(start)
reason = buffer.toString('utf-8', start + 1, start + 1 + length)
const length = readUInt8(buffer, start)
reason = decode(buffer.subarray(start + 1, start + 1 + length))
}

return {
Expand Down Expand Up @@ -385,16 +389,16 @@ export interface RtcpApp extends Rtcp {
readonly subtype: number
readonly source: number
readonly name: string
readonly data: Buffer
readonly data: Uint8Array
}

const parseAPP = (buffer: Buffer, base: Rtcp): RtcpApp => {
const parseAPP = (buffer: Uint8Array, base: Rtcp): RtcpApp => {
return {
...base,
subtype: base.count,
source: buffer.readUInt32BE(4),
name: buffer.toString('ascii', 8, 12),
data: buffer.slice(12),
source: readUInt32BE(buffer, 4),
name: decode(buffer.subarray(8, 12)),
data: buffer.subarray(12),
}
}

Expand Down
1 change: 1 addition & 0 deletions streams/src/utils/protocols/sdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ export const parse = (body: string): Sdp => {
export const sdpFromBody = (body: string): SdpMessage => {
return {
type: MessageType.SDP,
// FIXME: update after GenericMessage uses Uint8Array for data
data: Buffer.alloc(0),
sdp: parse(body),
}
Expand Down
2 changes: 1 addition & 1 deletion streams/tests/protocols-rtcp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ describe('Rtcp parsing', (test) => {
subtype: 5,
source: 42,
name: 'Life',
data: Buffer.from([0, 1, 2, 3, 42, 42, 42, 42]),
data: new Uint8Array([0, 1, 2, 3, 42, 42, 42, 42]),
})
})
})
2 changes: 1 addition & 1 deletion streams/tests/protocols-rtsp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ describe('connectionEnded', (test) => {
describe('bodyOffset', (test) => {
test('should return the lowest index of all possible line breaks', () => {
const bodyWithLinebreaks = '\r\r<svg>\r\n\r\n</svg>\n\n'
const buf = Buffer.alloc(setupResponse.length + bodyWithLinebreaks.length)
const buf = new Uint8Array(setupResponse.length + bodyWithLinebreaks.length)
setupResponse.split('').forEach((character, index) => {
buf[index] = character.charCodeAt(0)
})
Expand Down
20 changes: 10 additions & 10 deletions streams/tests/protocols.fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ export const rtpBuffersWithHeaderExt = [
/* biome-ignore format: custom formatting */
export const rtcpSRBuffers = [
// 0 reports
Buffer.from([
new Uint8Array([
128, 200, 0, 6, 243, 203, 32, 1, 131, 171, 3, 161, 235, 2, 11, 58, 0, 0,
148, 32, 0, 0, 0, 158, 0, 0, 155, 136,
]),

// 3 reports
Buffer.from([
new Uint8Array([
131, 200, 0, 24, 243, 203, 32, 1, 131, 171, 3, 161, 235, 2, 11, 58, 0, 0,
148, 32, 0, 0, 0, 158, 0, 0, 155, 136, 0, 0, 0, 1, 4, 0, 0, 10, 0, 0, 0,
1000, 0, 0, 0, 5, 0, 0, 0, 6, 0, 0, 0, 7, 0, 0, 0, 2, 4, 0, 0, 11, 0, 0, 0,
Expand All @@ -39,10 +39,10 @@ export const rtcpSRBuffers = [
/* biome-ignore format: custom formatting */
export const rtcpRRBuffers = [
// 0 reports
Buffer.from([128, 201, 0, 1, 27, 117, 249, 76]),
new Uint8Array([128, 201, 0, 1, 27, 117, 249, 76]),

// 3 reports
Buffer.from([
new Uint8Array([
131, 201, 0, 19, 27, 117, 249, 76, 0, 0, 0, 1, 4, 0, 0, 10, 0, 0, 0, 1000,
0, 0, 0, 5, 0, 0, 0, 6, 0, 0, 0, 7, 0, 0, 0, 2, 4, 0, 0, 11, 0, 0, 0, 1001,
0, 0, 0, 8, 0, 0, 0, 9, 0, 0, 0, 10, 0, 0, 0, 3, 4, 0, 0, 12, 0, 0, 0, 1002,
Expand All @@ -52,14 +52,14 @@ export const rtcpRRBuffers = [

/* biome-ignore format: custom formatting */
export const rtcpSDESBuffers = [
Buffer.from([
new Uint8Array([
129, 202, 0, 12, 217, 157, 189, 215, 1, 28, 117, 115, 101, 114, 50, 53, 48,
51, 49, 52, 53, 55, 54, 54, 64, 104, 111, 115, 116, 45, 50, 57, 50, 48, 53,
57, 53, 50, 6, 9, 71, 83, 116, 114, 101, 97, 109, 101, 114, 0, 0, 0,
]),

// 2 chunks (1+2 priv)
Buffer.from([
new Uint8Array([
130,
202,
0,
Expand Down Expand Up @@ -117,21 +117,21 @@ export const rtcpSDESBuffers = [

/* biome-ignore format: custom formatting */
export const rtcpBYEBuffers = [
Buffer.from([129, 203, 0, 1, 38, 197, 204, 95]),
new Uint8Array([129, 203, 0, 1, 38, 197, 204, 95]),

// 0 byes (valid, but useless)
Buffer.from([128, 203, 0, 0]),
new Uint8Array([128, 203, 0, 0]),

// 3 byes + reason (valid, but useless)
Buffer.from([
new Uint8Array([
131, 203, 0, 5, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 4, 76, 111, 115, 116, 0,
0, 0,
]),
]

/* biome-ignore format: custom formatting */
export const rtcpAPPBuffers = [
Buffer.from([
new Uint8Array([
133, 204, 0, 4, 0, 0, 0, 42, 76, 105, 102, 101, 0, 1, 2, 3, 42, 42, 42, 42,
]),
]
Expand Down
Loading

0 comments on commit a2a99eb

Please sign in to comment.