Skip to content

Commit

Permalink
Packet optimisation and small tweaks (#45)
Browse files Browse the repository at this point in the history
* Bytewise crc32

* Bump go-astikit to 0.30. Make crc32 generator. Remove old crc32 calculation func and corresponding tests/benchmarks.

* Replace OpenFile with Create in crc32 generator. Some minor changes

* PacketHeader passed by value everywhere. Restructuring PacketAdaptationField to minimise overhead on alignment. Return custom parser in isPSIComplete. isPSIComplete & parseData small tweaks. In packetPool.add replace two map access with one. In hasDiscontinuity & isSameAsPrevious call len(ps) ones. In programMap use RWMutex.

* Changes in copying process

* Rollback copy optimisations and unnecessary type conversion

* Cut out PacketsParser from packetPool, packetAccumulator and isPSIComplete.

Co-authored-by: Danil Korymov <[email protected]>
  • Loading branch information
k-danil and k-danil authored Jan 18, 2023
1 parent b10d419 commit db51df8
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 76 deletions.
10 changes: 5 additions & 5 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestParseData(t *testing.T) {
assert.Equal(t, cds, ds)

// Do nothing for CAT
ps = []*Packet{{Header: &PacketHeader{PID: PIDCAT}}}
ps = []*Packet{{Header: PacketHeader{PID: PIDCAT}}}
ds, err = parseData(ps, nil, pm)
assert.NoError(t, err)
assert.Empty(t, ds)
Expand All @@ -34,11 +34,11 @@ func TestParseData(t *testing.T) {
p := pesWithHeaderBytes()
ps = []*Packet{
{
Header: &PacketHeader{PID: uint16(256)},
Header: PacketHeader{PID: uint16(256)},
Payload: p[:33],
},
{
Header: &PacketHeader{PID: uint16(256)},
Header: PacketHeader{PID: uint16(256)},
Payload: p[33:],
},
}
Expand All @@ -51,11 +51,11 @@ func TestParseData(t *testing.T) {
p = psiBytes()
ps = []*Packet{
{
Header: &PacketHeader{PID: uint16(256)},
Header: PacketHeader{PID: uint16(256)},
Payload: p[:33],
},
{
Header: &PacketHeader{PID: uint16(256)},
Header: PacketHeader{PID: uint16(256)},
Payload: p[33:],
},
}
Expand Down
4 changes: 2 additions & 2 deletions demuxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewDemuxer(ctx context.Context, r io.Reader, opts ...func(*Demuxer)) (d *De
programMap: newProgramMap(),
r: r,
}
d.packetPool = newPacketPool(d.optPacketsParser, d.programMap)
d.packetPool = newPacketPool(d.programMap)

// Apply options
for _, opt := range opts {
Expand Down Expand Up @@ -194,7 +194,7 @@ func (dmx *Demuxer) updateData(ds []*DemuxerData) (d *DemuxerData) {
func (dmx *Demuxer) Rewind() (n int64, err error) {
dmx.dataBuffer = []*DemuxerData{}
dmx.packetBuffer = nil
dmx.packetPool = newPacketPool(dmx.optPacketsParser, dmx.programMap)
dmx.packetPool = newPacketPool(dmx.programMap)
if n, err = rewind(dmx.r); err != nil {
err = fmt.Errorf("astits: rewinding reader failed: %w", err)
return
Expand Down
6 changes: 3 additions & 3 deletions demuxer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func TestDemuxerNextPacket(t *testing.T) {
// Valid
buf := &bytes.Buffer{}
w := astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: buf})
b1, p1 := packet(*packetHeader, *packetAdaptationField, []byte("1"), true)
b1, p1 := packet(packetHeader, *packetAdaptationField, []byte("1"), true)
w.Write(b1)
b2, p2 := packet(*packetHeader, *packetAdaptationField, []byte("2"), true)
b2, p2 := packet(packetHeader, *packetAdaptationField, []byte("2"), true)
w.Write(b2)
dmx = NewDemuxer(context.Background(), bytes.NewReader(buf.Bytes()))

Expand Down Expand Up @@ -156,7 +156,7 @@ func TestDemuxerNextDataPATPMT(t *testing.T) {
func TestDemuxerRewind(t *testing.T) {
r := bytes.NewReader([]byte("content"))
dmx := NewDemuxer(context.Background(), r)
dmx.packetPool.add(&Packet{Header: &PacketHeader{PID: 1}})
dmx.packetPool.add(&Packet{Header: PacketHeader{PID: 1}})
dmx.dataBuffer = append(dmx.dataBuffer, &DemuxerData{})
b := make([]byte, 2)
_, err := r.Read(b)
Expand Down
6 changes: 3 additions & 3 deletions muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (m *Muxer) WriteData(d *MuxerData) (int, error) {
for payloadBytesWritten < len(d.PES.Data) {
pktLen := 1 + mpegTsPacketHeaderSize // sync byte + header
pkt := Packet{
Header: &PacketHeader{
Header: PacketHeader{
ContinuityCounter: uint8(ctx.cc.inc()),
HasAdaptationField: writeAf,
HasPayload: false,
Expand Down Expand Up @@ -360,7 +360,7 @@ func (m *Muxer) generatePAT() error {
wPacket := astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: &m.patBytes})

pkt := Packet{
Header: &PacketHeader{
Header: PacketHeader{
HasPayload: true,
PayloadUnitStartIndicator: true,
PID: PIDPAT,
Expand Down Expand Up @@ -428,7 +428,7 @@ func (m *Muxer) generatePMT() error {
wPacket := astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: &m.pmtBytes})

pkt := Packet{
Header: &PacketHeader{
Header: PacketHeader{
HasPayload: true,
PayloadUnitStartIndicator: true,
PID: pmtStartPID, // FIXME multiple programs support
Expand Down
31 changes: 15 additions & 16 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
// https://en.wikipedia.org/wiki/MPEG_transport_stream
type Packet struct {
AdaptationField *PacketAdaptationField
Header *PacketHeader
Header PacketHeader
Payload []byte // This is only the payload content
}

Expand All @@ -42,22 +42,22 @@ type PacketHeader struct {
// PacketAdaptationField represents a packet adaptation field
type PacketAdaptationField struct {
AdaptationExtensionField *PacketAdaptationExtensionField
OPCR *ClockReference // Original Program clock reference. Helps when one TS is copied into another
PCR *ClockReference // Program clock reference
TransportPrivateData []byte
TransportPrivateDataLength int
Length int
StuffingLength int // Only used in writePacketAdaptationField to request stuffing
SpliceCountdown int // Indicates how many TS packets from this one a splicing point occurs (Two's complement signed; may be negative)
IsOneByteStuffing bool // Only used for one byte stuffing - if true, adaptation field will be written as one uint8(0). Not part of TS format
RandomAccessIndicator bool // Set when the stream may be decoded without errors from this point
DiscontinuityIndicator bool // Set if current TS packet is in a discontinuity state with respect to either the continuity counter or the program clock reference
ElementaryStreamPriorityIndicator bool // Set when this stream should be considered "high priority"
HasAdaptationExtensionField bool
HasOPCR bool
HasPCR bool
HasTransportPrivateData bool
HasSplicingCountdown bool
Length int
IsOneByteStuffing bool // Only used for one byte stuffing - if true, adaptation field will be written as one uint8(0). Not part of TS format
StuffingLength int // Only used in writePacketAdaptationField to request stuffing
OPCR *ClockReference // Original Program clock reference. Helps when one TS is copied into another
PCR *ClockReference // Program clock reference
RandomAccessIndicator bool // Set when the stream may be decoded without errors from this point
SpliceCountdown int // Indicates how many TS packets from this one a splicing point occurs (Two's complement signed; may be negative)
TransportPrivateDataLength int
TransportPrivateData []byte
}

// PacketAdaptationExtensionField represents a packet adaptation extension field
Expand Down Expand Up @@ -118,7 +118,7 @@ func parsePacket(i *astikit.BytesIterator) (p *Packet, err error) {
}

// payloadOffset returns the payload offset
func payloadOffset(offsetStart int, h *PacketHeader, a *PacketAdaptationField) (offset int) {
func payloadOffset(offsetStart int, h PacketHeader, a *PacketAdaptationField) (offset int) {
offset = offsetStart + 3
if h.HasAdaptationField {
offset += 1 + a.Length
Expand All @@ -127,7 +127,7 @@ func payloadOffset(offsetStart int, h *PacketHeader, a *PacketAdaptationField) (
}

// parsePacketHeader parses the packet header
func parsePacketHeader(i *astikit.BytesIterator) (h *PacketHeader, err error) {
func parsePacketHeader(i *astikit.BytesIterator) (h PacketHeader, err error) {
// Get next bytes
var bs []byte
if bs, err = i.NextBytesNoCopy(3); err != nil {
Expand All @@ -136,7 +136,7 @@ func parsePacketHeader(i *astikit.BytesIterator) (h *PacketHeader, err error) {
}

// Create header
h = &PacketHeader{
return PacketHeader{
ContinuityCounter: uint8(bs[2] & 0xf),
HasAdaptationField: bs[2]&0x20 > 0,
HasPayload: bs[2]&0x10 > 0,
Expand All @@ -145,8 +145,7 @@ func parsePacketHeader(i *astikit.BytesIterator) (h *PacketHeader, err error) {
TransportErrorIndicator: bs[0]&0x80 > 0,
TransportPriority: bs[0]&0x20 > 0,
TransportScramblingControl: uint8(bs[2]) >> 6 & 0x3,
}
return
}, nil
}

// parsePacketAdaptationField parses the packet adaptation field
Expand Down Expand Up @@ -361,7 +360,7 @@ func writePacket(w *astikit.BitsWriter, p *Packet, targetPacketSize int) (writte
return written, nil
}

func writePacketHeader(w *astikit.BitsWriter, h *PacketHeader) (written int, retErr error) {
func writePacketHeader(w *astikit.BitsWriter, h PacketHeader) (written int, retErr error) {
b := astikit.NewBitsWriterBatch(w)

b.Write(h.TransportErrorIndicator)
Expand Down
25 changes: 12 additions & 13 deletions packet_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ import (

// packetAccumulator keeps track of packets for a single PID and decides when to flush them
type packetAccumulator struct {
parser PacketsParser
pid uint16
programMap *programMap
q []*Packet
}

// newPacketAccumulator creates a new packet queue for a single PID
func newPacketAccumulator(pid uint16, parser PacketsParser, programMap *programMap) *packetAccumulator {
func newPacketAccumulator(pid uint16, programMap *programMap) *packetAccumulator {
return &packetAccumulator{
parser: parser,
pid: pid,
programMap: programMap,
}
Expand Down Expand Up @@ -61,17 +59,15 @@ type packetPool struct {
b map[uint16]*packetAccumulator // Indexed by PID
m *sync.Mutex

parser PacketsParser
programMap *programMap
}

// newPacketPool creates a new packet pool with an optional parser and programMap
func newPacketPool(parser PacketsParser, programMap *programMap) *packetPool {
func newPacketPool(programMap *programMap) *packetPool {
return &packetPool{
b: make(map[uint16]*packetAccumulator),
m: &sync.Mutex{},

parser: parser,
programMap: programMap,
}
}
Expand All @@ -94,12 +90,14 @@ func (b *packetPool) add(p *Packet) (ps []*Packet) {
defer b.m.Unlock()

// Make sure accumulator exists
if _, ok := b.b[p.Header.PID]; !ok {
b.b[p.Header.PID] = newPacketAccumulator(p.Header.PID, b.parser, b.programMap)
acc, ok := b.b[p.Header.PID]
if !ok {
acc = newPacketAccumulator(p.Header.PID, b.programMap)
b.b[p.Header.PID] = acc
}

// Add to the accumulator
return b.b[p.Header.PID].add(p)
return acc.add(p)
}

// dump dumps the packet pool by looking for the first item with packets inside
Expand All @@ -123,12 +121,13 @@ func (b *packetPool) dump() (ps []*Packet) {

// hasDiscontinuity checks whether a packet is discontinuous with a set of packets
func hasDiscontinuity(ps []*Packet, p *Packet) bool {
return (p.Header.HasAdaptationField && p.AdaptationField.DiscontinuityIndicator) ||
(len(ps) > 0 && p.Header.HasPayload && p.Header.ContinuityCounter != (ps[len(ps)-1].Header.ContinuityCounter+1)%16) ||
(len(ps) > 0 && !p.Header.HasPayload && p.Header.ContinuityCounter != ps[len(ps)-1].Header.ContinuityCounter)
l := len(ps)
return (p.Header.HasAdaptationField && p.AdaptationField.DiscontinuityIndicator) || (l > 0 && ((p.Header.HasPayload && p.Header.ContinuityCounter != (ps[l-1].Header.ContinuityCounter+1)%16) ||
(!p.Header.HasPayload && p.Header.ContinuityCounter != ps[l-1].Header.ContinuityCounter)))
}

// isSameAsPrevious checks whether a packet is the same as the last packet of a set of packets
func isSameAsPrevious(ps []*Packet, p *Packet) bool {
return len(ps) > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[len(ps)-1].Header.ContinuityCounter
l := len(ps)
return l > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[l-1].Header.ContinuityCounter
}
34 changes: 17 additions & 17 deletions packet_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,36 @@ import (
)

func TestHasDiscontinuity(t *testing.T) {
assert.False(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 0, HasPayload: true}}))
assert.False(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 15}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{AdaptationField: &PacketAdaptationField{DiscontinuityIndicator: true}, Header: &PacketHeader{ContinuityCounter: 0, HasAdaptationField: true, HasPayload: true}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 0}}))
assert.False(t, hasDiscontinuity([]*Packet{{Header: PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: PacketHeader{ContinuityCounter: 0, HasPayload: true}}))
assert.False(t, hasDiscontinuity([]*Packet{{Header: PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: PacketHeader{ContinuityCounter: 15}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: PacketHeader{ContinuityCounter: 15}}}, &Packet{AdaptationField: &PacketAdaptationField{DiscontinuityIndicator: true}, Header: PacketHeader{ContinuityCounter: 0, HasAdaptationField: true, HasPayload: true}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: PacketHeader{ContinuityCounter: 0}}))
}

func TestIsSameAsPrevious(t *testing.T) {
assert.False(t, isSameAsPrevious([]*Packet{{Header: &PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 1}}))
assert.False(t, isSameAsPrevious([]*Packet{{Header: &PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 2, HasPayload: true}}))
assert.True(t, isSameAsPrevious([]*Packet{{Header: &PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true}}))
assert.False(t, isSameAsPrevious([]*Packet{{Header: PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: PacketHeader{ContinuityCounter: 1}}))
assert.False(t, isSameAsPrevious([]*Packet{{Header: PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: PacketHeader{ContinuityCounter: 2, HasPayload: true}}))
assert.True(t, isSameAsPrevious([]*Packet{{Header: PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true}}))
}

func TestPacketPool(t *testing.T) {
b := newPacketPool(nil, nil)
ps := b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 0, HasPayload: true, PID: 1}})
b := newPacketPool(nil)
ps := b.add(&Packet{Header: PacketHeader{ContinuityCounter: 0, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
ps = b.add(&Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
assert.Len(t, ps, 1)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 2}})
ps = b.add(&Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 2}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 2, HasPayload: true, PID: 1}})
ps = b.add(&Packet{Header: PacketHeader{ContinuityCounter: 2, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
ps = b.add(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
assert.Len(t, ps, 2)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 5, HasPayload: true, PID: 1}})
ps = b.add(&Packet{Header: PacketHeader{ContinuityCounter: 5, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
ps = b.add(&Packet{Header: PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
assert.Len(t, ps, 1)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 7, HasPayload: true, PID: 1}})
ps = b.add(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.dump()
assert.Len(t, ps, 2)
Expand Down
20 changes: 10 additions & 10 deletions packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func packetShort(h PacketHeader, payload []byte) ([]byte, *Packet) {
p := append(payload, bytes.Repeat([]byte{0}, MpegTsPacketSize-buf.Len())...)
w.Write(p)
return buf.Bytes(), &Packet{
Header: &h,
Header: h,
Payload: payload,
}
}
Expand All @@ -49,19 +49,19 @@ func TestParsePacket(t *testing.T) {
assert.EqualError(t, err, ErrPacketMustStartWithASyncByte.Error())

// Valid
b, ep := packet(*packetHeader, *packetAdaptationField, []byte("payload"), true)
b, ep := packet(packetHeader, *packetAdaptationField, []byte("payload"), true)
p, err := parsePacket(astikit.NewBytesIterator(b))
assert.NoError(t, err)
assert.Equal(t, p, ep)
}

func TestPayloadOffset(t *testing.T) {
assert.Equal(t, 3, payloadOffset(0, &PacketHeader{}, nil))
assert.Equal(t, 7, payloadOffset(1, &PacketHeader{HasAdaptationField: true}, &PacketAdaptationField{Length: 2}))
assert.Equal(t, 3, payloadOffset(0, PacketHeader{}, nil))
assert.Equal(t, 7, payloadOffset(1, PacketHeader{HasAdaptationField: true}, &PacketAdaptationField{Length: 2}))
}

func TestWritePacket(t *testing.T) {
eb, ep := packet(*packetHeader, *packetAdaptationField, []byte("payload"), false)
eb, ep := packet(packetHeader, *packetAdaptationField, []byte("payload"), false)
buf := &bytes.Buffer{}
w := astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: buf})
n, err := writePacket(w, ep, MpegTsPacketSize)
Expand All @@ -73,7 +73,7 @@ func TestWritePacket(t *testing.T) {
}

func TestWritePacket_HeaderOnly(t *testing.T) {
shortPacketHeader := *packetHeader
shortPacketHeader := packetHeader
shortPacketHeader.HasPayload = false
shortPacketHeader.HasAdaptationField = false
_, ep := packetShort(shortPacketHeader, nil)
Expand All @@ -94,7 +94,7 @@ func TestWritePacket_HeaderOnly(t *testing.T) {
assert.Equal(t, ep, p)
}

var packetHeader = &PacketHeader{
var packetHeader = PacketHeader{
ContinuityCounter: 10,
HasAdaptationField: true,
HasPayload: true,
Expand All @@ -119,7 +119,7 @@ func packetHeaderBytes(h PacketHeader, afControl string) []byte {
}

func TestParsePacketHeader(t *testing.T) {
v, err := parsePacketHeader(astikit.NewBytesIterator(packetHeaderBytes(*packetHeader, "11")))
v, err := parsePacketHeader(astikit.NewBytesIterator(packetHeaderBytes(packetHeader, "11")))
assert.Equal(t, packetHeader, v)
assert.NoError(t, err)
}
Expand All @@ -131,7 +131,7 @@ func TestWritePacketHeader(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, bytesWritten, 3)
assert.Equal(t, bytesWritten, buf.Len())
assert.Equal(t, packetHeaderBytes(*packetHeader, "11"), buf.Bytes())
assert.Equal(t, packetHeaderBytes(packetHeader, "11"), buf.Bytes())
}

var packetAdaptationField = &PacketAdaptationField{
Expand Down Expand Up @@ -254,7 +254,7 @@ func BenchmarkWritePCR(b *testing.B) {
}

func BenchmarkParsePacket(b *testing.B) {
bs, _ := packet(*packetHeader, *packetAdaptationField, []byte("payload"), true)
bs, _ := packet(packetHeader, *packetAdaptationField, []byte("payload"), true)

for i := 0; i < b.N; i++ {
b.ReportAllocs()
Expand Down
Loading

0 comments on commit db51df8

Please sign in to comment.