Skip to content

Commit

Permalink
Now dumping last packets on end of stream
Browse files Browse the repository at this point in the history
  • Loading branch information
asticode committed Aug 9, 2019
1 parent 8d58895 commit af4a358
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
57 changes: 41 additions & 16 deletions demuxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,26 @@ func (dmx *Demuxer) NextData() (d *Data, err error) {
for {
// Get next packet
if p, err = dmx.NextPacket(); err != nil {
// We don't dump the packet pool since we don't want incomplete data
// If the end of the stream has been reached, we dump the packet pool
if err == ErrNoMorePackets {
for {
// Dump packet pool
if ps = dmx.packetPool.dump(); len(ps) == 0 {
break
}

// Parse data
if ds, err = parseData(ps, dmx.optPacketsParser, dmx.programMap); err != nil {
// We need to silence this error as there may be some incomplete data here
// We still want to try to parse all packets, in case final data is complete
continue
}

// Update data
if d = dmx.updateData(ds); d != nil {
return
}
}
return
}
err = errors.Wrap(err, "astits: fetching next packet failed")
Expand All @@ -128,26 +146,33 @@ func (dmx *Demuxer) NextData() (d *Data, err error) {
return
}

// Check whether there is data to be processed
if len(ds) > 0 {
// Process data
d = ds[0]
dmx.dataBuffer = append(dmx.dataBuffer, ds[1:]...)

// Update program map
for _, v := range ds {
if v.PAT != nil {
for _, pgm := range v.PAT.Programs {
// Program number 0 is reserved to NIT
if pgm.ProgramNumber > 0 {
dmx.programMap.set(pgm.ProgramMapID, pgm.ProgramNumber)
}
// Update data
if d = dmx.updateData(ds); d != nil {
return
}
}
}

func (dmx *Demuxer) updateData(ds []*Data) (d *Data) {
// Check whether there is data to be processed
if len(ds) > 0 {
// Process data
d = ds[0]
dmx.dataBuffer = append(dmx.dataBuffer, ds[1:]...)

// Update program map
for _, v := range ds {
if v.PAT != nil {
for _, pgm := range v.PAT.Programs {
// Program number 0 is reserved to NIT
if pgm.ProgramNumber > 0 {
dmx.programMap.set(pgm.ProgramMapID, pgm.ProgramNumber)
}
}
}
return
}
}
return
}

// Rewind rewinds the demuxer reader
Expand Down
3 changes: 0 additions & 3 deletions demuxer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ func TestDemuxerNextData(t *testing.T) {
w.Write(b1)
b2, _ := packet(PacketHeader{ContinuityCounter: uint8(1), PID: PIDPAT}, PacketAdaptationField{}, b[147:])
w.Write(b2)
// We write this additional empty packet since we don't dump the packet pool anymore
b3, _ := packet(PacketHeader{ContinuityCounter: uint8(2), PayloadUnitStartIndicator: true, PID: PIDPAT}, PacketAdaptationField{}, []byte{})
w.Write(b3)
dmx := New(context.Background(), bytes.NewReader(w.Bytes()))
p, err := dmx.NextPacket()
assert.NoError(t, err)
Expand Down

0 comments on commit af4a358

Please sign in to comment.