Skip to content

Commit 99ab8df

Browse files
committed
[fix] package rtmp: 接收rtmp数据时,同一个message的多个chunk混合使用fmt1,2时,可能出现时间戳多加的情况
1 parent 9e80735 commit 99ab8df

File tree

3 files changed

+33
-8
lines changed

3 files changed

+33
-8
lines changed

pkg/rtmp/chunk_composer.go

+30-5
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,19 @@ type CompleteMessageCB func(stream *Stream) error
4040
// @param cb 回调结束后,内存块会被 ChunkComposer 再次使用
4141
func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
4242
bootstrap := make([]byte, 11)
43+
absTsFlag := false
4344

4445
for {
46+
// 5.3.1.1. Chunk Basic Header
47+
// 读取fmt和csid
4548
if _, err := io.ReadAtLeast(reader, bootstrap[:1], 1); err != nil {
4649
return err
4750
}
4851

49-
// 5.3.1.1. Chunk Basic Header
5052
fmt := (bootstrap[0] >> 6) & 0x03
5153
csid := int(bootstrap[0] & 0x3f)
5254

55+
// csid可能是变长的
5356
switch csid {
5457
case 0:
5558
if _, err := io.ReadAtLeast(reader, bootstrap[:1], 1); err != nil {
@@ -68,6 +71,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
6871
stream := c.getOrCreateStream(csid)
6972

7073
// 5.3.1.2. Chunk Message Header
74+
// 当前chunk的fmt不同,Message Header包含的字段也不同,是变长
7175
switch fmt {
7276
case 0:
7377
if _, err := io.ReadAtLeast(reader, bootstrap[:11], 11); err != nil {
@@ -76,6 +80,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
7680
// 包头中为绝对时间戳
7781
stream.header.Timestamp = bele.BEUint24(bootstrap)
7882
stream.header.TimestampAbs = stream.header.Timestamp
83+
absTsFlag = true
7984
stream.header.MsgLen = bele.BEUint24(bootstrap[3:])
8085
stream.header.MsgTypeID = bootstrap[6]
8186
stream.header.MsgStreamID = int(bele.LEUint32(bootstrap[7:]))
@@ -87,7 +92,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
8792
}
8893
// 包头中为相对时间戳
8994
stream.header.Timestamp = bele.BEUint24(bootstrap)
90-
stream.header.TimestampAbs += stream.header.Timestamp
95+
//stream.header.TimestampAbs += stream.header.Timestamp
9196
stream.header.MsgLen = bele.BEUint24(bootstrap[3:])
9297
stream.header.MsgTypeID = bootstrap[6]
9398

@@ -98,7 +103,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
98103
}
99104
// 包头中为相对时间戳
100105
stream.header.Timestamp = bele.BEUint24(bootstrap)
101-
stream.header.TimestampAbs += stream.header.Timestamp
106+
//stream.header.TimestampAbs += stream.header.Timestamp
102107

103108
case 3:
104109
// noop
@@ -107,7 +112,9 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
107112
// 5.3.1.3 Extended Timestamp
108113
// 使用ffmpeg推流时,发现时间戳超过3字节最大值后,即使是fmt3(即包头大小为0),依然存在ext ts字段
109114
// 所以这里我将 `==` 的判断改成了 `>=`
110-
// TODO chef: 测试其他客户端和ext ts相关的表现
115+
// TODO chef:
116+
// - 测试其他客户端和ext ts相关的表现
117+
// - 这部分可能还有问题,需要根据具体的case调整
111118
//if stream.header.Timestamp == maxTimestampInMessageHeader {
112119
if stream.header.Timestamp >= maxTimestampInMessageHeader {
113120
if _, err := io.ReadAtLeast(reader, bootstrap[:4], 4); err != nil {
@@ -126,7 +133,7 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
126133
}
127134
}
128135
//stream.header.CSID = csid
129-
//log.Debugf("CHEFGREPME tag1 fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
136+
//nazalog.Debugf("ChunkComposer chunk fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
130137

131138
var neededSize uint32
132139
if stream.header.MsgLen <= c.peerChunkSize {
@@ -152,6 +159,13 @@ func (c *ChunkComposer) RunLoop(reader io.Reader, cb CompleteMessageCB) error {
152159
}
153160

154161
stream.header.CSID = csid
162+
if !absTsFlag {
163+
// 这么处理相当于取最后一个chunk的时间戳差值,有的协议栈是取的第一个,正常来说都可以
164+
stream.header.TimestampAbs += stream.header.Timestamp
165+
}
166+
absTsFlag = false
167+
//nazalog.Debugf("ChunkComposer message fmt:%d header:%+v csid:%d len:%d ts:%d", fmt, stream.header, csid, stream.header.MsgLen, stream.header.TimestampAbs)
168+
155169
if err := cb(stream); err != nil {
156170
return err
157171
}
@@ -171,3 +185,14 @@ func (c *ChunkComposer) getOrCreateStream(csid int) *Stream {
171185
}
172186
return stream
173187
}
188+
189+
// 临时存放一些rtmp推流case在这,便于理解,以及修改后,回归用
190+
//
191+
// 场景:ffmpeg推送test.flv至lals
192+
// 关注点:message超过chunk时,fmt和timestamp的值
193+
//
194+
// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:143 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:520} csid:6 len:143 ts:520
195+
// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:4511 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:560} csid:6 len:4511 ts:560
196+
// ChunkComposer chunk fmt:3 header:{CSID:6 MsgLen:4511 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:560} csid:6 len:4511 ts:560
197+
// 此处应只给上层返回一次,也即一个message,时间戳应该是560
198+
// ChunkComposer chunk fmt:1 header:{CSID:6 MsgLen:904 Timestamp:40 MsgTypeID:9 MsgStreamID:1 TimestampAbs:600} csid:6 len:904 ts:600

pkg/rtmp/handshake.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error) {
193193
// s2
194194
// make digest to s2 suffix position
195195
random1528(s2)
196-
196+
197197
replyOffs := s2Len - keyLen
198198
makeDigestWithoutCenterPart(s2, replyOffs, s2key, s2[replyOffs:])
199199
}
@@ -287,4 +287,3 @@ func init() {
287287
random1528Buf[i] = bs[i%bsl]
288288
}
289289
}
290-

pkg/rtmp/stream.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ import (
1717

1818
const initMsgLen = 4096
1919

20+
// TODO chef: 将Timestamp字段隐藏,不对外暴露
2021
type Header struct {
2122
CSID int
2223
MsgLen uint32 // 不包含header的大小
23-
Timestamp uint32 // NOTICE 是 rtmp 协议 header 中的时间戳,可能是绝对的,也可能是相对的。
24+
Timestamp uint32 // NOTICE 是 rtmp 协议 header 中的时间戳,可能是绝对的,也可能是相对的。上层不应该使用这个字段,而应该使用TimestampAbs
2425
MsgTypeID uint8 // 8 audio 9 video 18 metadata
2526
MsgStreamID int
2627

0 commit comments

Comments
 (0)