Skip to content

Commit

Permalink
Packet pool optimisations (#44)
Browse files Browse the repository at this point in the history
* Bytewise crc32

* Bump go-astikit to 0.30. Make crc32 generator. Remove old crc32 calculation func and corresponding tests/benchmarks.

* Replace OpenFile with Create in crc32 generator. Some minor changes

* Introduce isPSIComplete function. In packetAccumulator add method make new slices with capacity of previous one.

Co-authored-by: Danil Korymov <[email protected]>
  • Loading branch information
k-danil and k-danil authored Dec 29, 2022
1 parent 34ae124 commit b10d419
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 10 deletions.
54 changes: 54 additions & 0 deletions data.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package astits

import (
"encoding/binary"
"fmt"

"github.com/asticode/go-astikit"
Expand Down Expand Up @@ -115,3 +116,56 @@ func isPESPayload(i []byte) bool {
// Check prefix
return uint32(i[0])<<16|uint32(i[1])<<8|uint32(i[2]) == 1
}

// isPSIComplete checks whether we have sufficient amount of packets to parse PSI
func isPSIComplete(ps []*Packet) bool {
// Get payload length
var l int
for _, p := range ps {
l += len(p.Payload)
}

// Append payload
var payload = make([]byte, l)
var o int
for _, p := range ps {
o += copy(payload[o:], p.Payload)
}

// Create reader
i := astikit.NewBytesIterator(payload)

// Get next byte
b, err := i.NextByte()
if err != nil {
return false
}

// Pointer filler bytes
i.Skip(int(b))

for i.HasBytesLeft() {

// Get PSI table ID
b, err = i.NextByte()
if err != nil {
return false
}

// Check whether we need to stop the parsing
if shouldStopPSIParsing(PSITableID(b)) {
break
}

// Get PSI section length
var bs []byte
bs, err = i.NextBytesNoCopy(2)
if err != nil {
return false
}

i.Skip(int(binary.BigEndian.Uint16(bs) & 0x0fff))
}

return i.Len() >= i.Offset()
}
18 changes: 8 additions & 10 deletions packet_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {

// Empty buffer if we detect a discontinuity
if hasDiscontinuity(mps, p) {
mps = []*Packet{}
mps = make([]*Packet, 0, cap(mps))
}

// Throw away packet if it's the same as the previous one
Expand All @@ -39,19 +39,17 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
// Flush buffer if new payload starts here
if p.Header.PayloadUnitStartIndicator {
ps = mps
mps = []*Packet{p}
} else {
mps = append(mps, p)
mps = make([]*Packet, 0, cap(mps))
}

mps = append(mps, p)

// Check if PSI payload is complete
if b.programMap != nil &&
(b.pid == PIDPAT || b.programMap.exists(b.pid)) {
// TODO Use partial data parsing instead
if _, err := parseData(mps, b.parser, b.programMap); err == nil {
ps = mps
mps = nil
}
(b.pid == PIDPAT || b.programMap.exists(b.pid)) &&
isPSIComplete(mps) {
ps = mps
mps = nil
}

b.q = mps
Expand Down

0 comments on commit b10d419

Please sign in to comment.