Skip to content

Commit

Permalink
Updated packet pool add strategy regarding errors
Browse files Browse the repository at this point in the history
  • Loading branch information
asticode committed Dec 16, 2017
1 parent 1a16e5b commit 4c9af50
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 15 deletions.
36 changes: 33 additions & 3 deletions packet_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package astits
import (
"sort"
"sync"

"github.com/asticode/go-astilog"
)

// packetPool represents a pool of packets
Expand All @@ -21,17 +23,39 @@ func newPacketPool() *packetPool {

// add adds a new packet to the pool
func (b *packetPool) add(p *Packet) (ps []*Packet) {
// Throw away packet if error indicator
if p.Header.TransportErrorIndicator {
return
}

// Throw away packets that don't have a payload until we figure out what we're going to do with them
// TODO figure out what we're going to do with them :D
if !p.Header.HasPayload {
astilog.Debug("Removing packet without payload, needs fixing")
return
}

// Lock
b.m.Lock()
defer b.m.Unlock()

// Init buffer or empty buffer if discontinuity
// Init buffer
var mps []*Packet
var ok bool
if mps, ok = b.b[p.Header.PID]; !ok || hasDiscontinuity(mps, p) {
if mps, ok = b.b[p.Header.PID]; !ok {
mps = []*Packet{}
}

// Empty buffer if we detect a discontinuity
if hasDiscontinuity(mps, p) {
mps = []*Packet{}
}

// Throw away packet if it's the same as the previous one
if isSameAsPrevious(mps, p) {
return
}

// Add packet
if len(mps) > 0 || (len(mps) == 0 && p.Header.PayloadUnitStartIndicator) {
mps = append(mps, p)
Expand Down Expand Up @@ -70,5 +94,11 @@ func (b *packetPool) dump() (ps []*Packet) {
// hasDiscontinuity checks whether a packet is discontinuous with a set of packets
func hasDiscontinuity(ps []*Packet, p *Packet) bool {
return (p.Header.HasAdaptationField && p.AdaptationField.DiscontinuityIndicator) ||
(len(ps) > 0 && p.Header.ContinuityCounter != (ps[len(ps)-1].Header.ContinuityCounter+1)%16)
(len(ps) > 0 && p.Header.HasPayload && p.Header.ContinuityCounter != (ps[len(ps)-1].Header.ContinuityCounter+1)%16) ||
(len(ps) > 0 && !p.Header.HasPayload && p.Header.ContinuityCounter != ps[len(ps)-1].Header.ContinuityCounter)
}

// isSameAsPrevious checks whether a packet is the same as the last packet of a set of packets
func isSameAsPrevious(ps []*Packet, p *Packet) bool {
return len(ps) > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[len(ps)-1].Header.ContinuityCounter
}
30 changes: 19 additions & 11 deletions packet_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,36 @@ import (
)

func TestHasDiscontinuity(t *testing.T) {
assert.False(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 0}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{AdaptationField: &PacketAdaptationField{DiscontinuityIndicator: true}, Header: &PacketHeader{ContinuityCounter: 0, HasAdaptationField: true}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 1}}))
assert.False(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 0, HasPayload: true}}))
assert.False(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 15}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{AdaptationField: &PacketAdaptationField{DiscontinuityIndicator: true}, Header: &PacketHeader{ContinuityCounter: 0, HasAdaptationField: true, HasPayload: true}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true}}))
assert.True(t, hasDiscontinuity([]*Packet{{Header: &PacketHeader{ContinuityCounter: 15}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 0}}))
}

func TestIsSameAsPrevious(t *testing.T) {
assert.False(t, isSameAsPrevious([]*Packet{{Header: &PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 1}}))
assert.False(t, isSameAsPrevious([]*Packet{{Header: &PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 2, HasPayload: true}}))
assert.True(t, isSameAsPrevious([]*Packet{{Header: &PacketHeader{ContinuityCounter: 1}}}, &Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true}}))
}

func TestPacketPool(t *testing.T) {
b := newPacketPool()
ps := b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 0, PID: 1}})
ps := b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 0, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 1, PayloadUnitStartIndicator: true, PID: 1}})
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 1, PayloadUnitStartIndicator: true, PID: 2}})
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 2}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 2, PID: 1}})
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 2, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 3, PayloadUnitStartIndicator: true, PID: 1}})
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
assert.Len(t, ps, 2)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 5, PID: 1}})
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 5, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 6, PayloadUnitStartIndicator: true, PID: 1}})
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 7, PID: 1}})
ps = b.add(&Packet{Header: &PacketHeader{ContinuityCounter: 7, HasPayload: true, PID: 1}})
assert.Len(t, ps, 0)
ps = b.dump()
assert.Len(t, ps, 2)
Expand Down
2 changes: 1 addition & 1 deletion packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var packetHeader = &PacketHeader{

func packetHeaderBytes(h PacketHeader) []byte {
w := astibinary.New()
w.Write("1") // Transport error indicator
w.Write(h.TransportErrorIndicator) // Transport error indicator
w.Write(h.PayloadUnitStartIndicator) // Payload unit start indicator
w.Write("1") // Transport priority
w.Write(fmt.Sprintf("%.13b", h.PID)) // PID
Expand Down

0 comments on commit 4c9af50

Please sign in to comment.