-
Notifications
You must be signed in to change notification settings - Fork 51
/
packet_buffer.go
148 lines (131 loc) · 3.58 KB
/
packet_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package astits
import (
"bufio"
"errors"
"fmt"
"io"
"github.com/asticode/go-astikit"
)
// packetBuffer represents a packet buffer
type packetBuffer struct {
packetSize int
s PacketSkipper
r io.Reader
packetReadBuffer []byte
}
// newPacketBuffer creates a new packet buffer
func newPacketBuffer(r io.Reader, packetSize int, s PacketSkipper) (pb *packetBuffer, err error) {
// Init
pb = &packetBuffer{
packetSize: packetSize,
s: s,
r: r,
}
// Packet size is not set
if pb.packetSize == 0 {
// Auto detect packet size
if pb.packetSize, err = autoDetectPacketSize(r); err != nil {
err = fmt.Errorf("astits: auto detecting packet size failed: %w", err)
return
}
}
return
}
// autoDetectPacketSize updates the packet size based on the first bytes
// Minimum packet size is 188 and is bounded by 2 sync bytes
// Assumption is made that the first byte of the reader is a sync byte
func autoDetectPacketSize(r io.Reader) (packetSize int, err error) {
// Read first bytes
const l = 193
var b = make([]byte, l)
shouldRewind, rerr := peek(r, b)
if rerr != nil {
err = fmt.Errorf("astits: reading first %d bytes failed: %w", l, rerr)
return
}
// Packet must start with a sync byte
if b[0] != syncByte {
err = ErrPacketMustStartWithASyncByte
return
}
// Look for sync bytes
for idx, b := range b {
if b == syncByte && idx >= MpegTsPacketSize {
// Update packet size
packetSize = idx
if !shouldRewind {
return
}
// Rewind or sync reader
var n int64
if n, err = rewind(r); err != nil {
err = fmt.Errorf("astits: rewinding failed: %w", err)
return
} else if n == -1 {
var ls = packetSize - (l - packetSize)
if _, err = r.Read(make([]byte, ls)); err != nil {
err = fmt.Errorf("astits: reading %d bytes to sync reader failed: %w", ls, err)
return
}
}
return
}
}
err = fmt.Errorf("astits: only one sync byte detected in first %d bytes", l)
return
}
// bufio.Reader can't be rewinded, which leads to packet loss on packet size autodetection
// but it has handy Peek() method
// so what we do here is peeking bytes for bufio.Reader and falling back to rewinding/syncing for all other readers
func peek(r io.Reader, b []byte) (shouldRewind bool, err error) {
if br, ok := r.(*bufio.Reader); ok {
var bs []byte
bs, err = br.Peek(len(b))
if err != nil {
return
}
copy(b, bs)
return false, nil
}
_, err = r.Read(b)
shouldRewind = true
return
}
// rewind rewinds the reader if possible, otherwise n = -1
func rewind(r io.Reader) (n int64, err error) {
if s, ok := r.(io.Seeker); ok {
if n, err = s.Seek(0, 0); err != nil {
err = fmt.Errorf("astits: seeking to 0 failed: %w", err)
return
}
return
}
n = -1
return
}
// next fetches the next packet from the buffer
func (pb *packetBuffer) next() (p *Packet, err error) {
// Read
if pb.packetReadBuffer == nil || len(pb.packetReadBuffer) != pb.packetSize {
pb.packetReadBuffer = make([]byte, pb.packetSize)
}
// Loop to make sure we return a packet even if first packets are skipped
for p == nil {
if _, err = io.ReadFull(pb.r, pb.packetReadBuffer); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = ErrNoMorePackets
} else {
err = fmt.Errorf("astits: reading %d bytes failed: %w", pb.packetSize, err)
}
return
}
// Parse packet
if p, err = parsePacket(astikit.NewBytesIterator(pb.packetReadBuffer), pb.s); err != nil {
if !errors.Is(err, errSkippedPacket) {
err = fmt.Errorf("astits: building packet failed: %w", err)
return
}
}
}
return
}