From b3024c8a4e2c42a7ed8cd2772a55a5b5f36e514e Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 13 Jun 2020 10:22:03 +0800 Subject: [PATCH] messages: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [feat] lalserver增加中继转推(relay push)功能,可将接收到的推流(pub)转推(push)到其他rtmp类型的服务器,支持1对n的转推 - [feat] package rtmp: 新增函数amf0::ReadArray,用于解析amf array数据 - [refactor] `rtmp/client_push_session`增加当前会话连接状态 - [fix] demo/analyseflv: 修复解析metadata的bug - [perf] httpflv协议关闭时,不做httpflv的GOP缓存 - [refactor] logic中的配置变更为全局变量 - [refactor] lal/demo移动到lal/app/demo - [refactor] 日志整理 --- README.md | 91 +++++----- TEST.md | 2 +- {demo => app/demo}/analyseflv/analyseflv.go | 22 ++- {demo => app/demo}/analysehls/analysehls.go | 0 {demo => app/demo}/flvfile2es/flvfile2es.go | 0 .../flvfile2rtmppush/flvfile2rtmppush.go | 0 {demo => app/demo}/httpflvpull/httpflvpull.go | 0 {demo => app/demo}/learnrtsp/learnrtsp.go | 0 {demo => app/demo}/learnts/learnts.go | 0 {demo => app/demo}/modflvfile/modflvfile.go | 0 {demo => app/demo}/rtmppull/rtmppull.go | 0 {demo => app/demo}/tscmp/tscmp.go | 0 build.sh | 17 +- conf/edge.conf.json | 37 ++++ conf/lalserver.conf.json | 5 + conf/lalserver.default.conf.json | 5 + gen_release.sh | 2 +- go.mod | 2 +- go.sum | 4 +- pkg/hls/hls.go | 1 + pkg/hls/muxer.go | 43 +++-- pkg/innertest/innertest.go | 3 + pkg/logic/config.go | 18 +- pkg/logic/entry.go | 2 +- pkg/logic/gop_cache.go | 1 + pkg/logic/group.go | 147 ++++++++++++---- pkg/logic/relay_push.go | 165 ++++++++++++++++++ pkg/logic/server_manager.go | 7 +- pkg/logic/var.go | 14 ++ pkg/rtmp/amf0.go | 70 +++++++- pkg/rtmp/amf0_test.go | 13 ++ pkg/rtmp/client_pull_session.go | 3 +- pkg/rtmp/client_push_session.go | 55 +++++- pkg/rtmp/client_session.go | 9 +- pkg/rtmp/rtmp.go | 1 - pkg/rtmp/server_session.go | 7 +- test.sh | 16 +- 37 files changed, 604 insertions(+), 158 deletions(-) rename {demo => app/demo}/analyseflv/analyseflv.go (89%) rename {demo => app/demo}/analysehls/analysehls.go (100%) rename {demo => app/demo}/flvfile2es/flvfile2es.go (100%) rename {demo => app/demo}/flvfile2rtmppush/flvfile2rtmppush.go (100%) rename {demo => app/demo}/httpflvpull/httpflvpull.go (100%) rename {demo => app/demo}/learnrtsp/learnrtsp.go (100%) rename {demo => app/demo}/learnts/learnts.go (100%) rename {demo => app/demo}/modflvfile/modflvfile.go (100%) rename {demo => app/demo}/rtmppull/rtmppull.go (100%) rename {demo => app/demo}/tscmp/tscmp.go (100%) create mode 100644 conf/edge.conf.json create mode 100644 pkg/logic/relay_push.go create mode 100644 pkg/logic/var.go diff --git a/README.md b/README.md index f2a5a1e0..f4897ac0 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Wide
-Go语言编写的直播流媒体网络传输服务器 +Go live stream lib/client/server and much more.

@@ -27,7 +27,7 @@ Go语言编写的直播流媒体网络传输服务器 --- -已支持RTMP,HTTP-FLV,HLS(m3u8 + ts),H264/AVC,H265/HEVC,AAC,GOP缓存。 +Go直播流媒体网络传输服务器,已支持RTMP,HTTP-FLV,HLS(m3u8+ts),H264/AVC,H265/HEVC,AAC,GOP缓存,中继转推。更多功能持续迭代中。 ### README 目录 @@ -56,7 +56,7 @@ $cd $GOPATH/src/github.com/q191201771/lal $./build.sh # 使用 Go module -$export GOPROXY=https://goproxy.cn +$export GOPROXY=https://goproxy.io $git clone https://github.com/q191201771/lal.git $cd lal $./build.sh @@ -93,6 +93,11 @@ $./bin/lalserver -c conf/lalserver.conf.json "fragment_duration_ms": 3000, // 单个TS文件切片时长,单位毫秒 "fragment_num": 6 // M3U8文件列表中TS文件的数量 }, + "relay_push": { + "enable": false, // 是否开启中继转推功能,开启后,自身接收到的所有流都会转推出去 + "addr_list":[ // 中继转推的对端地址,支持填写多个地址,做1对n的转推 + ] + }, "pprof": { "enable": true, // 是否开启Go pprof web服务的监听 "addr": ":10001" // Go pprof web地址 @@ -110,39 +115,39 @@ $./bin/lalserver -c conf/lalserver.conf.json ### 三. 仓库目录框架 -简单来说,源码在`pkg/`,`app/`,`demo/`三个目录下。 +简单来说,源码在`pkg/`,`app/lalserver/`,`app/demo/`三个目录下。 - `pkg/`:存放各package包,供本repo的程序以及其他业务方使用 -- `app/`:重要程序的入口(目前仅包含lalserver——基于lal编写的一个通用流媒体服务器程序) -- `demo/`:存放各种基于`lal/pkg`开发的小程序(小工具),一个子目录是一个程序,详情见各源码文件中头部的说明 +- `app/lalserver`:基于lal编写的一个通用流媒体服务器程序入口 +- `app/demo/`:存放各种基于`lal/pkg`开发的小程序(小工具),一个子目录是一个程序,详情见各源码文件中头部的说明 ``` -pkg/ ...... -|-- rtmp/ ......RTMP协议 -|-- httpflv/ ......HTTP-FLV协议 -|-- hls/ ......HLS协议 -|-- logic/ ......lalserver服务器程序的上层业务逻辑 -|-- aac/ ......音频AAC编码格式相关 -|-- avc/ ......视频H264/AVC编码格式相关 -|-- hevc/ ......视频H265/HEVC编码格式相关 -|-- innertest/ ......测试代码 - -app/ ...... -|-- lalserver/ ......流媒体服务器lalserver的main函数入口 - -demo/ ...... -|-- analyseflv ...... -|-- analysehls ...... -|-- flvfile2rtmppush ...... -|-- rtmppull ...... -|-- httpflvpull ...... -|-- modflvfile ...... -|-- flvfile2es ...... -|-- learnts ...... -|-- tscmp ...... - -conf/ ......配置文件目录 -bin/ ......可执行文件编译输出目录 +pkg/ ...... +|-- rtmp/ ......RTMP协议 +|-- httpflv/ ......HTTP-FLV协议 +|-- hls/ ......HLS协议 +|-- logic/ ......lalserver服务器程序的上层业务逻辑 +|-- aac/ ......音频AAC编码格式相关 +|-- avc/ ......视频H264/AVC编码格式相关 +|-- hevc/ ......视频H265/HEVC编码格式相关 +|-- innertest/ ......测试代码 + +app/ ...... +|-- lalserver/ ......流媒体服务器lalserver的main函数入口 + +|-- demo/ ...... + |-- analyseflv ...... + |-- analysehls ...... + |-- flvfile2rtmppush ...... + |-- rtmppull ...... + |-- httpflvpull ...... + |-- modflvfile ...... + |-- flvfile2es ...... + |-- learnts ...... + |-- tscmp ...... + +conf/ ......配置文件目录 +bin/ ......可执行文件编译输出目录 ``` 后续我再画些源码架构图。 @@ -152,24 +157,14 @@ bin/ ......可执行文件编译输出目录 ### 四. Roadmap -#### 项目原则: - -* 代码可读可维护 -* 框架清晰,模块化。业务与协议隔离。协议、网络传输等基础功能都是功能纯粹,可独立使用的库。 -* 高性能 -* 提供各种client代码,即使你使用其他流媒体服务器,这些client也是非常好用的 -* 依托Go语言,提供所有平台下最简单的编译、调试、发布方式 -* 不依赖第三方代码 -* 后续可快速集成各种网络传输协议,流媒体封装协议 - #### lalserver服务器功能 -- [x] **pub 接收推流:** RTMP -- [x] **sub 接收拉流:** RTMP,HTTP-FLV,HLS(m3u8+ts) +- [x] **pub接收推流:** RTMP +- [x] **sub接收拉流:** RTMP,HTTP-FLV,HLS(m3u8+ts) - [x] **音频编码格式:** AAC - [x] **视频编码格式:** H264/AVC,H265/HEVC - [x] **GOP缓存:** 用于秒开 -- [ ] RTMP转推 +- [x] **relay push中继转推:** RTMP - [ ] RTMP回源 - [ ] HTTP-FLV回源 - [ ] 静态转推、回源 @@ -204,3 +199,9 @@ bin/ ......可执行文件编译输出目录 见[TEST.md](https://github.com/q191201771/lal/blob/master/TEST.md) +### 八. 项目star趋势图 + +觉得这个repo还不错,就点个star支持一下吧 :) + +[![Stargazers over time](https://starchart.cc/q191201771/lal.svg)](https://starchart.cc/q191201771/lal) + diff --git a/TEST.md b/TEST.md index 3e672199..f5eb1c0f 100644 --- a/TEST.md +++ b/TEST.md @@ -19,7 +19,7 @@ | 1000 | 1000 | 125% | 464MB | * 测试机:32核16G(lalserver服务器和压测工具同时跑在这一个机器上) -* 压测工具:lal中的 `/demo/flvfile2rtmppush` 以及 `/demo/rtmppull` +* 压测工具:lal中的 `/app/demo/flvfile2rtmppush` 以及 `/app/demo/rtmppull` * 推流码率:使用`srs-bench`中的FLV文件,大概200kbps * lalserver版本:基于 git commit: xxx diff --git a/demo/analyseflv/analyseflv.go b/app/demo/analyseflv/analyseflv.go similarity index 89% rename from demo/analyseflv/analyseflv.go rename to app/demo/analyseflv/analyseflv.go index 6b726d6d..d49566c7 100644 --- a/demo/analyseflv/analyseflv.go +++ b/app/demo/analyseflv/analyseflv.go @@ -99,17 +99,18 @@ func main() { switch tag.Header.Type { case httpflv.TagTypeMetadata: - //nazalog.Debugf("----------\n", hex.Dump(tag.Raw)) if printMetaData { + nazalog.Debugf("----------\n%s", hex.Dump(tag.Raw[11:])) + // TODO chef: 这部分可以移入到rtmp package中 _, l, err := rtmp.AMF0.ReadString(tag.Raw[11:]) nazalog.Assert(nil, err) - kv, _, err := rtmp.AMF0.ReadObject(tag.Raw[11+l:]) + ops, _, err := rtmp.AMF0.ReadArray(tag.Raw[11+l : len(tag.Raw)-4]) nazalog.Assert(nil, err) var buf bytes.Buffer - buf.WriteString(fmt.Sprintf("-----\ncount:%d\n", len(kv))) - for k, v := range kv { - buf.WriteString(fmt.Sprintf(" %s: %v\n", k, v)) + buf.WriteString(fmt.Sprintf("-----\ncount:%d\n", len(ops))) + for _, op := range ops { + buf.WriteString(fmt.Sprintf(" %s: %+v\n", op.Key, op.Value)) } nazalog.Debugf("%+v", buf.String()) } @@ -172,8 +173,17 @@ func analysisVideoTag(tag httpflv.Tag) { } else { body := tag.Raw[11:] - for i := 5; i != int(tag.Header.DataSize); { + i := 5 + for i != int(tag.Header.DataSize) { + if i+4 > int(tag.Header.DataSize) { + nazalog.Errorf("invalid nalu size. i=%d, tag size=%d", i, int(tag.Header.DataSize)) + break + } naluLen := bele.BEUint32(body[i:]) + if i+int(naluLen) > int(tag.Header.DataSize) { + nazalog.Errorf("invalid nalu size. i=%d, naluLen=%d, tag size=%d", i, naluLen, int(tag.Header.DataSize)) + break + } switch t { case typeAVC: if avc.CalcNaluType(body[i+4:]) == avc.NaluUnitTypeIDRSlice { diff --git a/demo/analysehls/analysehls.go b/app/demo/analysehls/analysehls.go similarity index 100% rename from demo/analysehls/analysehls.go rename to app/demo/analysehls/analysehls.go diff --git a/demo/flvfile2es/flvfile2es.go b/app/demo/flvfile2es/flvfile2es.go similarity index 100% rename from demo/flvfile2es/flvfile2es.go rename to app/demo/flvfile2es/flvfile2es.go diff --git a/demo/flvfile2rtmppush/flvfile2rtmppush.go b/app/demo/flvfile2rtmppush/flvfile2rtmppush.go similarity index 100% rename from demo/flvfile2rtmppush/flvfile2rtmppush.go rename to app/demo/flvfile2rtmppush/flvfile2rtmppush.go diff --git a/demo/httpflvpull/httpflvpull.go b/app/demo/httpflvpull/httpflvpull.go similarity index 100% rename from demo/httpflvpull/httpflvpull.go rename to app/demo/httpflvpull/httpflvpull.go diff --git a/demo/learnrtsp/learnrtsp.go b/app/demo/learnrtsp/learnrtsp.go similarity index 100% rename from demo/learnrtsp/learnrtsp.go rename to app/demo/learnrtsp/learnrtsp.go diff --git a/demo/learnts/learnts.go b/app/demo/learnts/learnts.go similarity index 100% rename from demo/learnts/learnts.go rename to app/demo/learnts/learnts.go diff --git a/demo/modflvfile/modflvfile.go b/app/demo/modflvfile/modflvfile.go similarity index 100% rename from demo/modflvfile/modflvfile.go rename to app/demo/modflvfile/modflvfile.go diff --git a/demo/rtmppull/rtmppull.go b/app/demo/rtmppull/rtmppull.go similarity index 100% rename from demo/rtmppull/rtmppull.go rename to app/demo/rtmppull/rtmppull.go diff --git a/demo/tscmp/tscmp.go b/app/demo/tscmp/tscmp.go similarity index 100% rename from demo/tscmp/tscmp.go rename to app/demo/tscmp/tscmp.go diff --git a/build.sh b/build.sh index 3bb55d27..70aabaf3 100755 --- a/build.sh +++ b/build.sh @@ -24,19 +24,14 @@ LDFlags=" \ -X 'github.com/q191201771/naza/pkg/bininfo.BuildGoVersion=${BuildGoVersion}' \ " -for file in `ls ${ROOT_DIR}/app` -do - if [ -d ${ROOT_DIR}/app/${file} ]; then - echo "build" ${ROOT_DIR}/app/${file} "..." - cd ${ROOT_DIR}/app/${file} && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${file} - fi -done +echo "build" ${ROOT_DIR}/app/lalserver "..." +cd ${ROOT_DIR}/app/lalserver && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/lalserver -for file in `ls ${ROOT_DIR}/demo` +for file in `ls ${ROOT_DIR}/app/demo` do - if [ -d ${ROOT_DIR}/demo/${file} ]; then - echo "build" ${ROOT_DIR}/demo/${file} "..." - cd ${ROOT_DIR}/demo/${file} && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${file} + if [ -d ${ROOT_DIR}/app/demo/${file} ]; then + echo "build" ${ROOT_DIR}/app/demo/${file} "..." + cd ${ROOT_DIR}/app/demo/${file} && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${file} fi done diff --git a/conf/edge.conf.json b/conf/edge.conf.json new file mode 100644 index 00000000..7c666633 --- /dev/null +++ b/conf/edge.conf.json @@ -0,0 +1,37 @@ +{ + "rtmp": { + "enable": true, + "addr": ":19351", + "gop_num": 2 + }, + "httpflv": { + "enable": false, + "sub_listen_addr": ":8080", + "gop_num": 2 + }, + "hls": { + "enable": false, + "sub_listen_addr": ":8081", + "out_path": "/tmp/lal/hls/", + "fragment_duration_ms": 3000, + "fragment_num": 6 + }, + "relay_push": { + "enable": true, + "addr_list":[ + "127.0.0.1:19350" + ] + }, + "pprof": { + "enable": false, + "addr": ":10001" + }, + "log": { + "level": 1, + "filename": "./logs/edge.log", + "is_to_stdout": true, + "is_rotate_daily": true, + "short_file_flag": true, + "assert_behavior": 1 + } +} diff --git a/conf/lalserver.conf.json b/conf/lalserver.conf.json index 4a696473..f6a983a4 100644 --- a/conf/lalserver.conf.json +++ b/conf/lalserver.conf.json @@ -16,6 +16,11 @@ "fragment_duration_ms": 3000, "fragment_num": 6 }, + "relay_push": { + "enable": false, + "addr_list":[ + ] + }, "pprof": { "enable": true, "addr": ":10001" diff --git a/conf/lalserver.default.conf.json b/conf/lalserver.default.conf.json index 4a696473..f6a983a4 100644 --- a/conf/lalserver.default.conf.json +++ b/conf/lalserver.default.conf.json @@ -16,6 +16,11 @@ "fragment_duration_ms": 3000, "fragment_num": 6 }, + "relay_push": { + "enable": false, + "addr_list":[ + ] + }, "pprof": { "enable": true, "addr": ":10001" diff --git a/gen_release.sh b/gen_release.sh index 30945ebc..ccaa7027 100755 --- a/gen_release.sh +++ b/gen_release.sh @@ -5,7 +5,7 @@ ROOT_DIR=`pwd` OUT_DIR=release -v=`git tag | tail -n 1` +v=`git tag --sort=version:refname | tail -n 1` prefix=lal_${v}_ rm -rf ${ROOT_DIR}/${OUT_DIR} diff --git a/go.mod b/go.mod index 2132abd4..653bdb87 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.12 -require github.com/q191201771/naza v0.13.0 +require github.com/q191201771/naza v0.13.1 diff --git a/go.sum b/go.sum index d117586d..d4e4a7ed 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.13.0 h1:tHgsMlMu9dHGmL26cGpFJDeP1qdFwbXVJHPg6IlAuvo= -github.com/q191201771/naza v0.13.0/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= +github.com/q191201771/naza v0.13.1 h1:eA2n87AfADKKFNtRyulKbuD942POferOHZwmAfwBw7I= +github.com/q191201771/naza v0.13.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= diff --git a/pkg/hls/hls.go b/pkg/hls/hls.go index d352b602..541f1e12 100644 --- a/pkg/hls/hls.go +++ b/pkg/hls/hls.go @@ -17,6 +17,7 @@ package hls // - 配置项 // - Server // - 超时时间 +// - 考虑删除过期的TS文件,并考虑做一个全量TS的m3u8作为点播用 // https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist // https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index ce4c97f9..5b52f844 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -13,6 +13,8 @@ import ( "fmt" "os" + "github.com/q191201771/naza/pkg/unique" + "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/rtmp" @@ -28,13 +30,14 @@ type fragmentInfo struct { } type MuxerConfig struct { - Enable bool `json:"enable"` OutPath string `json:"out_path"` FragmentDurationMS int `json:"fragment_duration_ms"` FragmentNum int `json:"fragment_num"` } type Muxer struct { + UniqueKey string + streamName string outPath string playlistFilename string @@ -61,6 +64,9 @@ type Muxer struct { } func NewMuxer(streamName string, config *MuxerConfig) *Muxer { + uk := unique.GenUniqueKey("HLSMUXER") + nazalog.Infof("lifecycle new hls muxer. [%s] streamName=%s", uk, streamName) + op := getMuxerOutPath(config.OutPath, streamName) playlistFilename := getM3U8Filename(op, streamName) playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) @@ -68,6 +74,7 @@ func NewMuxer(streamName string, config *MuxerConfig) *Muxer { videoOut = videoOut[0:0] frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1 return &Muxer{ + UniqueKey: uk, streamName: streamName, outPath: op, playlistFilename: playlistFilename, @@ -80,12 +87,12 @@ func NewMuxer(streamName string, config *MuxerConfig) *Muxer { } func (m *Muxer) Start() { - nazalog.Infof("start hls muxer. streamName=%s", m.streamName) + nazalog.Infof("start hls muxer. [%s]", m.UniqueKey) m.ensureDir() } -func (m *Muxer) Stop() { - nazalog.Infof("stop hls muxer. streamName=%s", m.streamName) +func (m *Muxer) Dispose() { + nazalog.Infof("lifecycle dispose hls muxer. [%s]", m.UniqueKey) m.flushAudio() m.closeFragment() } @@ -102,7 +109,7 @@ func (m *Muxer) FeedRTMPMessage(msg rtmp.AVMsg) { // TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接 func (m *Muxer) feedVideo(msg rtmp.AVMsg) { if len(msg.Payload) < 5 { - nazalog.Errorf("invalid video message length. len=%d", len(msg.Payload)) + nazalog.Errorf("invalid video message length. [%s] len=%d", m.UniqueKey, len(msg.Payload)) return } if msg.Payload[0]&0xF != 7 { @@ -126,13 +133,13 @@ func (m *Muxer) feedVideo(msg rtmp.AVMsg) { out := m.videoOut[0:0] for i := 5; i != len(msg.Payload); { if i+4 > len(msg.Payload) { - nazalog.Errorf("slice len not enough. i=%d, len=%d", i, len(msg.Payload)) + nazalog.Errorf("slice len not enough. [%s] i=%d, len=%d", m.UniqueKey, i, len(msg.Payload)) return } nalBytes := int(bele.BEUint32(msg.Payload[i:])) i += 4 if i+nalBytes > len(msg.Payload) { - nazalog.Errorf("slice len not enough. i=%d, payload len=%d, nalBytes=%d", i, len(msg.Payload), nalBytes) + nazalog.Errorf("slice len not enough. [%s] i=%d, payload len=%d, nalBytes=%d", m.UniqueKey, i, len(msg.Payload), nalBytes) return } srcNalType := msg.Payload[i] @@ -141,7 +148,7 @@ func (m *Muxer) feedVideo(msg rtmp.AVMsg) { //nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts) if nalType >= 7 && nalType <= 9 { - nazalog.Warn("should not reach here.") + //nazalog.Warn("should not reach here.") i += nalBytes continue } @@ -190,7 +197,7 @@ func (m *Muxer) feedVideo(msg rtmp.AVMsg) { m.updateFragment(frame.dts, boundary, 1) if !m.opened { - nazalog.Warn("not opened.") + nazalog.Warnf("not opened. [%s]", m.UniqueKey) return } @@ -200,10 +207,9 @@ func (m *Muxer) feedVideo(msg rtmp.AVMsg) { func (m *Muxer) feedAudio(msg rtmp.AVMsg) { if len(msg.Payload) < 3 { - nazalog.Errorf("invalid audio message length. len=%d", len(msg.Payload)) + nazalog.Errorf("invalid audio message length. [%s] len=%d", m.UniqueKey, len(msg.Payload)) } if msg.Payload[0]>>4 != 10 { - // TODO chef: HLS音频现在只做了h264的支持 return } @@ -212,6 +218,11 @@ func (m *Muxer) feedAudio(msg rtmp.AVMsg) { return } + if m.adts.IsNil() { + nazalog.Warnf("feed audio message but aac seq header not exist. [%s]", m.UniqueKey) + return + } + pts := uint64(msg.Header.TimestampAbs) * 90 m.updateFragment(pts, m.spspps == nil, 2) @@ -226,7 +237,7 @@ func (m *Muxer) feedAudio(msg rtmp.AVMsg) { } func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) { - m.adts.PutAACSequenceHeader(msg.Payload) + _ = m.adts.PutAACSequenceHeader(msg.Payload) } func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) { @@ -234,6 +245,7 @@ func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) { } func (m *Muxer) appendSPSPPS(out []byte) []byte { + // TODO chef: 检查spspps是否存在 index := 10 nnals := m.spspps[index] & 0x1f index++ @@ -266,7 +278,7 @@ func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) { // 当前时间戳跳跃很大,或者是往回跳跃超过了阈值,强制开启新的fragment maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10) if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) { - nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", m.fragTS, ts) + nazalog.Warnf("force fragment split. [%s] fragTS=%d, ts=%d", m.UniqueKey, m.fragTS, ts) force = true } else { // TODO chef: 考虑ts比fragTS小的情况 @@ -300,7 +312,7 @@ func (m *Muxer) openFragment(ts uint64, discont bool) { id := m.getFragmentID() filename := getTSFilename(m.outPath, m.streamName, id) - m.fragmentOP.OpenFile(filename) + _ = m.fragmentOP.OpenFile(filename) m.opened = true frag := m.getFrag(m.nfrags) @@ -396,12 +408,11 @@ func (m *Muxer) nextFrag() { // 4. 流关闭时 func (m *Muxer) flushAudio() { if !m.opened { - nazalog.Warn("flushAudio by not opened.") + nazalog.Warnf("flushAudio by not opened. [%s]", m.UniqueKey) return } if m.aaframe == nil { - nazalog.Warn("flushAudio by aframe is nil.") return } diff --git a/pkg/innertest/innertest.go b/pkg/innertest/innertest.go index aee2f223..59c7a218 100644 --- a/pkg/innertest/innertest.go +++ b/pkg/innertest/innertest.go @@ -150,9 +150,11 @@ func InnerTestEntry(t *testing.T) { func compareFile() { r, err := ioutil.ReadFile(rFLVFileName) assert.Equal(tt, nil, err) + nazalog.Debugf("%s filesize:%d", rFLVFileName, len(r)) w, err := ioutil.ReadFile(wFLVPullFileName) assert.Equal(tt, nil, err) + nazalog.Debugf("%s filesize:%d", wFLVPullFileName, len(w)) res := bytes.Compare(r, w) assert.Equal(tt, 0, res) err = os.Remove(wFLVPullFileName) @@ -160,6 +162,7 @@ func compareFile() { w2, err := ioutil.ReadFile(wRTMPPullFileName) assert.Equal(tt, nil, err) + nazalog.Debugf("%s filesize:%d", wRTMPPullFileName, len(w2)) res = bytes.Compare(r, w2) assert.Equal(tt, 0, res) err = os.Remove(wRTMPPullFileName) diff --git a/pkg/logic/config.go b/pkg/logic/config.go index 51b86548..bac8a32e 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -19,9 +19,10 @@ import ( ) type Config struct { - RTMPConfig RTMPConfig `json:"rtmp"` - HTTPFLVConfig HTTPFLVConfig `json:"httpflv"` - HLSConfig HLSConfig `json:"hls"` + RTMPConfig RTMPConfig `json:"rtmp"` + HTTPFLVConfig HTTPFLVConfig `json:"httpflv"` + HLSConfig HLSConfig `json:"hls"` + RelayPushConfig RelayPushConfig `json:"relay_push"` PProfConfig PProfConfig `json:"pprof"` LogConfig nazalog.Option `json:"log"` @@ -40,8 +41,14 @@ type HTTPFLVConfig struct { } type HLSConfig struct { + Enable bool `json:"enable"` SubListenAddr string `json:"sub_listen_addr"` - *hls.MuxerConfig + hls.MuxerConfig +} + +type RelayPushConfig struct { + Enable bool `json:"enable"` + AddrList []string `json:"addr_list"` } type PProfConfig struct { @@ -65,7 +72,8 @@ func LoadConf(confFile string) (*Config, error) { } // 检查配置必须项 - if !j.Exist("rtmp") || !j.Exist("httpflv") || !j.Exist("hls") || !j.Exist("log") || !j.Exist("pprof") { + if !j.Exist("rtmp") || !j.Exist("httpflv") || !j.Exist("hls") || !j.Exist("relay_push") || + !j.Exist("pprof") || !j.Exist("log") { return &config, errors.New("missing key field in config file") } diff --git a/pkg/logic/entry.go b/pkg/logic/entry.go index 92a73d4f..1ac21944 100644 --- a/pkg/logic/entry.go +++ b/pkg/logic/entry.go @@ -27,7 +27,7 @@ func Entry(confFile string) { initLog(config.LogConfig) nazalog.Infof("bininfo: %s", bininfo.StringifySingleLine()) - sm := NewServerManager(config) + sm := NewServerManager() if config.PProfConfig.Enable { go runWebPProf(config.PProfConfig.Addr) diff --git a/pkg/logic/gop_cache.go b/pkg/logic/gop_cache.go index 353b8b58..67c791ae 100644 --- a/pkg/logic/gop_cache.go +++ b/pkg/logic/gop_cache.go @@ -97,6 +97,7 @@ func (gc *GOPCache) Feed(msg rtmp.AVMsg, lg LazyGet) { } } + // 这个size的判断去掉也行 if gc.gopSize > 1 { if msg.IsVideoKeyNalu() { gc.feedNewGOP(msg, lg()) diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 4bedf9a1..d7a873e6 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -9,24 +9,25 @@ package logic import ( + "fmt" "sync" "github.com/q191201771/lal/pkg/hls" "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/rtmp" - log "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazalog" "github.com/q191201771/naza/pkg/unique" ) +// TODO chef: group可以考虑搞个协程 + type Group struct { UniqueKey string appName string streamName string - hlsConfig *hls.MuxerConfig - exitChan chan struct{} mutex sync.Mutex @@ -35,25 +36,32 @@ type Group struct { httpflvSubSessionSet map[*httpflv.SubSession]struct{} hlsMuxer *hls.Muxer gopCache *GOPCache - // TODO chef: 如果没有开启httpflv监听,可以不做格式转换,节约CPU资源 - httpflvGopCache *GOPCache -} + httpflvGopCache *GOPCache -var _ rtmp.PubSessionObserver = &Group{} + relayPushList []*RelayPush +} -func NewGroup(appName string, streamName string, rtmpGOPNum int, httpflvGOPNum int, hlsConfig *hls.MuxerConfig) *Group { +func NewGroup(appName string, streamName string) *Group { uk := unique.GenUniqueKey("GROUP") - log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName) + nazalog.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName) + var relayPushList []*RelayPush + if config.RelayPushConfig.Enable { + for _, addr := range config.RelayPushConfig.AddrList { + url := fmt.Sprintf("rtmp://%s/%s/%s", addr, appName, streamName) + relayPush := NewRelayPush(url) + relayPushList = append(relayPushList, relayPush) + } + } return &Group{ UniqueKey: uk, appName: appName, streamName: streamName, - hlsConfig: hlsConfig, exitChan: make(chan struct{}, 1), rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}), httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}), - gopCache: NewGOPCache("rtmp", uk, rtmpGOPNum), - httpflvGopCache: NewGOPCache("httpflv", uk, rtmpGOPNum), + gopCache: NewGOPCache("rtmp", uk, config.RTMPConfig.GOPNum), + httpflvGopCache: NewGOPCache("httpflv", uk, config.HTTPFLVConfig.GOPNum), + relayPushList: relayPushList, } } @@ -62,48 +70,84 @@ func (group *Group) RunLoop() { } func (group *Group) Dispose() { - log.Infof("lifecycle dispose group. [%s]", group.UniqueKey) + nazalog.Infof("lifecycle dispose group. [%s]", group.UniqueKey) group.exitChan <- struct{}{} group.mutex.Lock() defer group.mutex.Unlock() + if group.pubSession != nil { group.pubSession.Dispose() + group.pubSession = nil } + for session := range group.rtmpSubSessionSet { session.Dispose() } + group.rtmpSubSessionSet = nil + for session := range group.httpflvSubSessionSet { session.Dispose() } + group.httpflvSubSessionSet = nil + + if group.hlsMuxer != nil { + group.hlsMuxer.Dispose() + group.hlsMuxer = nil + } + + if config.RelayPushConfig.Enable { + for _, rp := range group.relayPushList { + rp.Dispose() + } + } } func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool { - log.Debugf("add PubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("add PubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + group.mutex.Lock() + defer group.mutex.Unlock() + if group.pubSession != nil { - log.Errorf("PubSession already exist in group. [%s] old=%s, new=%s", group.UniqueKey, group.pubSession.UniqueKey, session.UniqueKey) + nazalog.Errorf("PubSession already exist in group. [%s] old=%s, new=%s", group.UniqueKey, group.pubSession.UniqueKey, session.UniqueKey) return false } - group.pubSession = session - if group.hlsConfig.Enable { - group.hlsMuxer = hls.NewMuxer(group.streamName, group.hlsConfig) + + if config.HLSConfig.Enable { + group.hlsMuxer = hls.NewMuxer(group.streamName, &config.HLSConfig.MuxerConfig) group.hlsMuxer.Start() } - group.mutex.Unlock() + + if config.RelayPushConfig.Enable { + for _, rp := range group.relayPushList { + rp.Start() + } + } session.SetPubSessionObserver(group) + return true } func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) { - log.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("del PubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + group.mutex.Lock() defer group.mutex.Unlock() + group.pubSession = nil - if group.hlsConfig.Enable { - group.hlsMuxer.Stop() + + if config.HLSConfig.Enable && group.hlsMuxer != nil { + group.hlsMuxer.Dispose() + group.hlsMuxer = nil + } + + if config.RelayPushConfig.Enable { + for _, rp := range group.relayPushList { + rp.Stop() + } } group.gopCache.Clear() @@ -111,7 +155,7 @@ func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) { } func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) { - log.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("add SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() defer group.mutex.Unlock() group.rtmpSubSessionSet[session] = struct{}{} @@ -121,14 +165,14 @@ func (group *Group) AddRTMPSubSession(session *rtmp.ServerSession) { } func (group *Group) DelRTMPSubSession(session *rtmp.ServerSession) { - log.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("del SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() defer group.mutex.Unlock() delete(group.rtmpSubSessionSet, session) } func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) { - log.Debugf("add httpflv SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("add httpflv SubSession into group. [%s] [%s]", group.UniqueKey, session.UniqueKey) session.WriteHTTPResponseHeader() session.WriteFLVHeader() @@ -138,7 +182,7 @@ func (group *Group) AddHTTPFLVSubSession(session *httpflv.SubSession) { } func (group *Group) DelHTTPFLVSubSession(session *httpflv.SubSession) { - log.Debugf("del httpflv SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) + nazalog.Debugf("del httpflv SubSession from group. [%s] [%s]", group.UniqueKey, session.UniqueKey) group.mutex.Lock() defer group.mutex.Unlock() delete(group.httpflvSubSessionSet, session) @@ -161,16 +205,19 @@ func (group *Group) OnReadRTMPAVMsg(msg rtmp.AVMsg) { group.mutex.Lock() defer group.mutex.Unlock() - //log.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1]) + p := make([]byte, len(msg.Payload)) + copy(p, msg.Payload) + msg.Payload = p + + //nazalog.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1]) group.broadcastRTMP(msg) - if group.hlsConfig.Enable { + + if config.HLSConfig.Enable && group.hlsMuxer != nil { group.hlsMuxer.FeedRTMPMessage(msg) } } func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { - //log.Infof("%+v", msg.Header) - var ( lcd LazyChunkDivider lrm2ft LazyRTMPMsg2FLVTag @@ -212,6 +259,35 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { _ = session.AsyncWrite(lcd.Get()) } + // TODO chef: rtmp sub, rtmp push, httpflv sub 的发送逻辑都差不多,可以考虑封装一下 + if config.RelayPushConfig.Enable { + for _, rp := range group.relayPushList { + if !rp.Connected() { + continue + } + if rp.IsFresh { + if group.gopCache.Metadata != nil { + _ = rp.AsyncWrite(group.gopCache.Metadata) + } + if group.gopCache.VideoSeqHeader != nil { + _ = rp.AsyncWrite(group.gopCache.VideoSeqHeader) + } + if group.gopCache.AACSeqHeader != nil { + _ = rp.AsyncWrite(group.gopCache.AACSeqHeader) + } + for i := 0; i < group.gopCache.GetGOPCount(); i++ { + for _, item := range group.gopCache.GetGOPDataAt(i) { + _ = rp.AsyncWrite(item) + } + } + + rp.IsFresh = false + } + + _ = rp.AsyncWrite(lcd.Get()) + } + } + // # 4. 广播。遍历所有 httpflv sub session,转发数据 for session := range group.httpflvSubSessionSet { if session.IsFresh { @@ -236,7 +312,12 @@ func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { session.WriteRawPacket(lrm2ft.Get()) } - // # 4. 缓存关键信息,以及gop - group.gopCache.Feed(msg, lcd.Get) - group.httpflvGopCache.Feed(msg, lrm2ft.Get) + // # 5. 缓存关键信息,以及gop + if config.RTMPConfig.Enable { + group.gopCache.Feed(msg, lcd.Get) + } + + if config.HTTPFLVConfig.Enable { + group.httpflvGopCache.Feed(msg, lrm2ft.Get) + } } diff --git a/pkg/logic/relay_push.go b/pkg/logic/relay_push.go new file mode 100644 index 00000000..5e495e30 --- /dev/null +++ b/pkg/logic/relay_push.go @@ -0,0 +1,165 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +import ( + "sync" + "time" + + "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/unique" + + "github.com/q191201771/lal/pkg/rtmp" +) + +// TODO chef: 结合Group和Session做一次重构 + +type RelayPushStatus uint + +const ( + RelayPushStatusStart RelayPushStatus = iota + RelayPushStatusStop + RelayPushStatusDispose +) + +type RelayPush struct { + IsFresh bool + + url string + uk string + + notifyChan chan struct{} + + mutex sync.Mutex + t RelayPushStatus + session *rtmp.PushSession +} + +func NewRelayPush(url string) *RelayPush { + uk := unique.GenUniqueKey("RELAYPUSH") + nazalog.Infof("lifecycle new relaypush. [%s] url=%s", uk, url) + rp := &RelayPush{ + url: url, + uk: uk, + notifyChan: make(chan struct{}, 1), + IsFresh: true, + } + go rp.runLoop() + return rp +} + +func (rp *RelayPush) Start() { + rp.notify(RelayPushStatusStart) +} + +func (rp *RelayPush) Stop() { + rp.notify(RelayPushStatusStop) +} + +func (rp *RelayPush) Dispose() { + nazalog.Infof("lifecycle dispose relaypush. [%s]", rp.uk) + rp.notify(RelayPushStatusDispose) +} + +func (rp *RelayPush) Connected() bool { + rp.mutex.Lock() + defer rp.mutex.Unlock() + + return rp.connected() +} + +func (rp *RelayPush) AsyncWrite(msg []byte) error { + rp.mutex.Lock() + defer rp.mutex.Unlock() + + if !rp.connected() { + return ErrLogic + } + return rp.session.AsyncWrite(msg) +} + +func (rp *RelayPush) connected() bool { + return rp.session != nil && rp.session.Status() == rtmp.PushSessionStatusConnected +} + +func (rp *RelayPush) runLoop() { + ticker := time.NewTicker(time.Duration(relayPushCheckIntervalMS) * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-rp.notifyChan: + if rp.do() { + return + } + case <-ticker.C: + if rp.do() { + return + } + } + } +} + +func (rp *RelayPush) do() (dispose bool) { + rp.mutex.Lock() + defer rp.mutex.Unlock() + + switch rp.t { + case RelayPushStatusStart: + if rp.session == nil { + rp.buildNewSession() + } + if rp.session.Status() == rtmp.PushSessionStatusError { + nazalog.Infof("relay push error. [%s] [%s]", rp.uk, rp.session.UniqueKey()) + rp.buildNewSession() + } + if rp.session.Status() == rtmp.PushSessionStatusInit { + go func(s *rtmp.PushSession) { + nazalog.Infof("start relay push. [%s]", rp.uk) + err := s.Push(rp.url) + if err == nil { + nazalog.Infof("relay push succ. [%s]", rp.uk) + } else { + nazalog.Warnf("relay push fail. [%s] err=%+v", rp.uk, err) + } + }(rp.session) + } + dispose = false + case RelayPushStatusStop: + if rp.session != nil && rp.session.Status() == rtmp.PushSessionStatusConnected { + rp.session.Dispose() + rp.session = nil + } + dispose = false + case RelayPushStatusDispose: + if rp.session != nil && rp.session.Status() == rtmp.PushSessionStatusConnected { + rp.session.Dispose() + rp.session = nil + } + dispose = true + } + return +} + +func (rp *RelayPush) buildNewSession() { + rp.session = rtmp.NewPushSession(func(option *rtmp.PushSessionOption) { + option.ConnectTimeoutMS = relayPushConnectTimeoutMS + option.PushTimeoutMS = relayPushTimeoutMS + option.WriteAVTimeoutMS = relayPushWriteAVTimeoutMS + }) + rp.IsFresh = true +} + +func (rp *RelayPush) notify(t RelayPushStatus) { + rp.t = t + select { + case rp.notifyChan <- struct{}{}: + default: + } +} diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 60a52da5..ed67c2be 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -21,8 +21,6 @@ import ( ) type ServerManager struct { - config *Config - rtmpServer *rtmp.Server httpflvServer *httpflv.Server hlsServer *hls.Server @@ -32,9 +30,8 @@ type ServerManager struct { groupMap map[string]*Group // TODO chef: with appName } -func NewServerManager(config *Config) *ServerManager { +func NewServerManager() *ServerManager { m := &ServerManager{ - config: config, groupMap: make(map[string]*Group), exitChan: make(chan struct{}), } @@ -198,7 +195,7 @@ func (sm *ServerManager) check() { func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group { group, exist := sm.groupMap[streamName] if !exist { - group = NewGroup(appName, streamName, sm.config.RTMPConfig.GOPNum, sm.config.HTTPFLVConfig.GOPNum, sm.config.HLSConfig.MuxerConfig) + group = NewGroup(appName, streamName) sm.groupMap[streamName] = group } go group.RunLoop() diff --git a/pkg/logic/var.go b/pkg/logic/var.go new file mode 100644 index 00000000..3cb01140 --- /dev/null +++ b/pkg/logic/var.go @@ -0,0 +1,14 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package logic + +var relayPushCheckIntervalMS = 1000 +var relayPushConnectTimeoutMS = 5000 +var relayPushTimeoutMS = 5000 +var relayPushWriteAVTimeoutMS = 5000 diff --git a/pkg/rtmp/amf0.go b/pkg/rtmp/amf0.go index f298d60e..b2776ffd 100644 --- a/pkg/rtmp/amf0.go +++ b/pkg/rtmp/amf0.go @@ -18,7 +18,7 @@ import ( "io" "github.com/q191201771/naza/pkg/bele" - log "github.com/q191201771/naza/pkg/nazalog" + "github.com/q191201771/naza/pkg/nazalog" ) var ( @@ -32,6 +32,7 @@ const ( AMF0TypeMarkerString = uint8(0x02) AMF0TypeMarkerObject = uint8(0x03) AMF0TypeMarkerNull = uint8(0x05) + AMF0TypeMarkerEcmaArray = uint8(0x08) AMF0TypeMarkerObjectEnd = uint8(0x09) AMF0TypeMarkerLongString = uint8(0x0c) @@ -39,7 +40,6 @@ const ( //AMF0TypeMarkerMovieclip = uint8(0x04) //AMF0TypeMarkerUndefined = uint8(0x06) //AMF0TypeMarkerReference = uint8(0x07) - //AMF0TypeMarkerEcmaArray = uint8(0x08) //AMF0TypeMarkerStrictArray = uint8(0x0a) //AMF0TypeMarkerData = uint8(0x0b) //AMF0TypeMarkerUnsupported = uint8(0x0d) @@ -128,7 +128,7 @@ func (amf0) WriteObject(writer io.Writer, objs []ObjectPair) error { return err } default: - log.Panicf("unknown value type. i=%d, v=%v", i, objs[i].Value) + nazalog.Panicf("unknown value type. i=%d, v=%+v", i, objs[i].Value) } } _, err := writer.Write(AMF0TypeMarkerObjectEndBytes) @@ -265,10 +265,68 @@ func (amf0) ReadObject(b []byte) (map[string]interface{}, int, error) { obj[k] = v index += l default: - log.Panicf("unknown type. vt=%d", vt) + nazalog.Panicf("unknown type. vt=%d", vt) + } + } +} + +// TODO chef: +// - 实现WriteArray +// - ReadArray和ReadObject有些代码重复 + +func (amf0) ReadArray(b []byte) ([]ObjectPair, int, error) { + if len(b) < 5 { + return nil, 0, ErrAMFTooShort + } + if b[0] != AMF0TypeMarkerEcmaArray { + return nil, 0, ErrAMFInvalidType + } + count := int(bele.BEUint32(b[1:])) + + index := 5 + var ops []ObjectPair + for i := 0; i < count; i++ { + k, l, err := AMF0.ReadStringWithoutType(b[index:]) + if err != nil { + return nil, 0, err + } + index += l + if len(b)-index < 1 { + return nil, 0, ErrAMFTooShort + } + vt := b[index] + switch vt { + case AMF0TypeMarkerString: + v, l, err := AMF0.ReadString(b[index:]) + if err != nil { + return nil, 0, err + } + ops = append(ops, ObjectPair{k, v}) + index += l + case AMF0TypeMarkerBoolean: + v, l, err := AMF0.ReadBoolean(b[index:]) + if err != nil { + return nil, 0, err + } + ops = append(ops, ObjectPair{k, v}) + index += l + case AMF0TypeMarkerNumber: + v, l, err := AMF0.ReadNumber(b[index:]) + if err != nil { + return nil, 0, err + } + ops = append(ops, ObjectPair{k, v}) + index += l + default: + nazalog.Panicf("unknown type. vt=%d", vt) } } - //panic("should not reach here.") - //return nil, 0, nil + if len(b)-index >= 3 && bytes.Equal(b[index:index+3], AMF0TypeMarkerObjectEndBytes) { + index += 3 + } else { + // 测试时发现Array最后也是以00 00 09结束,不确定是否是标准规定的,加个日志在这 + nazalog.Warn("amf ReadArray without suffix AMF0TypeMarkerObjectEndBytes.") + } + return ops, index, nil } diff --git a/pkg/rtmp/amf0_test.go b/pkg/rtmp/amf0_test.go index 813d0401..00dd4a34 100644 --- a/pkg/rtmp/amf0_test.go +++ b/pkg/rtmp/amf0_test.go @@ -13,6 +13,8 @@ import ( "strings" "testing" + "github.com/q191201771/naza/pkg/nazalog" + . "github.com/q191201771/lal/pkg/rtmp" "github.com/q191201771/naza/pkg/assert" "github.com/q191201771/naza/pkg/fake" @@ -104,6 +106,17 @@ func TestAmf0_WriteBoolean_ReadBoolean(t *testing.T) { } } +func TestAmf0_ReadArray(t *testing.T) { + gold := []byte{0x08, 0x00, 0x00, 0x00, 0x10, 0x00, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x77, 0x69, 0x64, 0x74, 0x68, 0x00, 0x40, 0x88, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x00, 0x40, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x72, 0x61, 0x74, 0x65, 0x00, 0x40, 0x69, 0xe8, 0x50, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x72, 0x61, 0x74, 0x65, 0x00, 0x40, 0x39, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0c, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x69, 0x64, 0x00, 0x40, 0x1c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x72, 0x61, 0x74, 0x65, 0x00, 0x40, 0x3d, 0x54, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x72, 0x61, 0x74, 0x65, 0x00, 0x40, 0xe5, 0x88, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x69, 0x7a, 0x65, 0x00, 0x40, 0x30, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x73, 0x74, 0x65, 0x72, 0x65, 0x6f, 0x01, 0x01, 0x00, 0x0c, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x69, 0x64, 0x00, 0x40, 0x24, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0b, 0x6d, 0x61, 0x6a, 0x6f, 0x72, 0x5f, 0x62, 0x72, 0x61, 0x6e, 0x64, 0x02, 0x00, 0x04, 0x69, 0x73, 0x6f, 0x6d, 0x00, 0x0d, 0x6d, 0x69, 0x6e, 0x6f, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02, 0x00, 0x03, 0x35, 0x31, 0x32, 0x00, 0x11, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65, 0x5f, 0x62, 0x72, 0x61, 0x6e, 0x64, 0x73, 0x02, 0x00, 0x10, 0x69, 0x73, 0x6f, 0x6d, 0x69, 0x73, 0x6f, 0x32, 0x61, 0x76, 0x63, 0x31, 0x6d, 0x70, 0x34, 0x31, 0x00, 0x07, 0x65, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x02, 0x00, 0x0d, 0x4c, 0x61, 0x76, 0x66, 0x35, 0x37, 0x2e, 0x38, 0x33, 0x2e, 0x31, 0x30, 0x30, 0x00, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x69, 0x7a, 0x65, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09} + + ops, l, err := AMF0.ReadArray(gold) + assert.Equal(t, nil, err) + assert.Equal(t, 16, len(ops)) + assert.Equal(t, 359, len(gold)) + assert.Equal(t, 359, l) + nazalog.Debug(ops) +} + func TestAMF0Corner(t *testing.T) { var ( mw *fake.Writer diff --git a/pkg/rtmp/client_pull_session.go b/pkg/rtmp/client_pull_session.go index 89a66ff5..e23631d0 100644 --- a/pkg/rtmp/client_pull_session.go +++ b/pkg/rtmp/client_pull_session.go @@ -49,7 +49,8 @@ func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error if err := s.core.doWithTimeout(rawURL); err != nil { return err } - return s.core.WaitLoop() + + return <-s.core.Done() } func (s *PullSession) Dispose() { diff --git a/pkg/rtmp/client_push_session.go b/pkg/rtmp/client_push_session.go index 7e03042e..1b9a6d9d 100644 --- a/pkg/rtmp/client_push_session.go +++ b/pkg/rtmp/client_push_session.go @@ -8,8 +8,20 @@ package rtmp +import "sync/atomic" + +type PushSessionStatus uint32 + +const ( + PushSessionStatusInit PushSessionStatus = iota + PushSessionStatusConnecting + PushSessionStatusConnected + PushSessionStatusError +) + type PushSession struct { - core *ClientSession + core *ClientSession + status uint32 } type PushSessionOption struct { @@ -37,24 +49,57 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession { option.DoTimeoutMS = opt.PushTimeoutMS option.WriteAVTimeoutMS = opt.WriteAVTimeoutMS }), + status: 0, } } // 阻塞直到收到服务端返回的 rtmp publish 对应结果的信令或发生错误 func (s *PushSession) Push(rawURL string) error { - return s.core.doWithTimeout(rawURL) + s.setStatus(PushSessionStatusConnecting) + err := s.core.doWithTimeout(rawURL) + if err == nil { + s.setStatus(PushSessionStatusConnected) + } else { + s.setStatus(PushSessionStatusError) + } + return err } func (s *PushSession) AsyncWrite(msg []byte) error { - return s.core.AsyncWrite(msg) + err := s.core.AsyncWrite(msg) + if err != nil { + s.setStatus(PushSessionStatusError) + } + return err } func (s *PushSession) Flush() error { - return s.core.Flush() + err := s.core.Flush() + if err != nil { + s.setStatus(PushSessionStatusError) + } + return err } func (s *PushSession) Dispose() { + s.setStatus(PushSessionStatusError) s.core.Dispose() } -// TODO chef: 建议 ClientSession WaitLoop 接口也可以暴露出来 +func (s *PushSession) Status() PushSessionStatus { + v := atomic.LoadUint32(&s.status) + return PushSessionStatus(v) +} + +func (s *PushSession) Done() <-chan error { + return s.core.Done() +} + +func (s *PushSession) UniqueKey() string { + return s.core.UniqueKey +} + +func (s *PushSession) setStatus(status PushSessionStatus) { + i := uint32(status) + atomic.StoreUint32(&s.status, i) +} diff --git a/pkg/rtmp/client_session.go b/pkg/rtmp/client_session.go index 98b2c239..52405a0e 100644 --- a/pkg/rtmp/client_session.go +++ b/pkg/rtmp/client_session.go @@ -156,9 +156,8 @@ func (s *ClientSession) do(rawURL string) <-chan error { return ch } -func (s *ClientSession) WaitLoop() error { - err := <-s.conn.Done() - return err +func (s *ClientSession) Done() <-chan error { + return s.conn.Done() } func (s *ClientSession) AsyncWrite(msg []byte) error { @@ -353,7 +352,8 @@ func (s *ClientSession) doProtocolControlMessage(stream *Stream) error { s.peerWinAckSize = val log.Infof("-----> Window Acknowledgement Size: %d. [%s]", s.peerWinAckSize, s.UniqueKey) case typeidBandwidth: - log.Warnf("-----> Set Peer Bandwidth. ignore. [%s]", s.UniqueKey) + // TODO chef: 是否需要关注这个信令 + log.Debugf("-----> Set Peer Bandwidth. ignore. [%s]", s.UniqueKey) case typeidSetChunkSize: // composer内部会自动更新peer chunk size. log.Infof("-----> Set Chunk Size %d. [%s]", val, s.UniqueKey) @@ -428,6 +428,7 @@ func (s *ClientSession) tcpConnect() error { s.conn = connection.New(conn, func(option *connection.Option) { option.ReadBufSize = readBufSize + option.WriteChanFullBehavior = connection.WriteChanFullBehaviorBlock }) return nil } diff --git a/pkg/rtmp/rtmp.go b/pkg/rtmp/rtmp.go index bc6445aa..0bd560fc 100644 --- a/pkg/rtmp/rtmp.go +++ b/pkg/rtmp/rtmp.go @@ -117,7 +117,6 @@ func (msg AVMsg) IsAACSeqHeader() bool { } type AVMsgObserver interface { - OnReadRTMPAVMsg(msg AVMsg) } type OnReadRTMPAVMsg func(msg AVMsg) diff --git a/pkg/rtmp/server_session.go b/pkg/rtmp/server_session.go index ae0a1392..660ca82b 100644 --- a/pkg/rtmp/server_session.go +++ b/pkg/rtmp/server_session.go @@ -28,7 +28,8 @@ type ServerSessionObserver interface { var _ ServerSessionObserver = &Server{} type PubSessionObserver interface { - AVMsgObserver + // 注意,回调结束后,内部会复用Payload内存块 + OnReadRTMPAVMsg(msg AVMsg) } func (s *ServerSession) SetPubSessionObserver(obs PubSessionObserver) { @@ -223,7 +224,9 @@ func (s *ServerSession) doCommandMessage(stream *Stream) error { case "FCUnpublish": fallthrough case "getStreamLength": - log.Warnf("read command message, ignore it. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) + fallthrough + case "deleteStream": + log.Debugf("read command message, ignore it. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) default: log.Errorf("read unknown command message. [%s] cmd=%s, %s", s.UniqueKey, cmd, stream.toDebugString()) } diff --git a/test.sh b/test.sh index 60eb8bde..ad62adfd 100755 --- a/test.sh +++ b/test.sh @@ -72,18 +72,10 @@ if [ ! -f "./pkg/hls/testdata/test.flv" ]; then fi # 将配置文件分别拷贝到logic,rtmp,httpflv,hls的testdata目录下 -if [ ! -f "./pkg/logic/testdata/lalserver.default.conf.json" ]; then - cp ./conf/lalserver.default.conf.json ./pkg/logic/testdata/lalserver.default.conf.json -fi -if [ ! -f "./pkg/rtmp/testdata/lalserver.default.conf.json" ]; then - cp ./conf/lalserver.default.conf.json ./pkg/rtmp/testdata/lalserver.default.conf.json -fi -if [ ! -f "./pkg/httpflv/testdata/lalserver.default.conf.json" ]; then - cp ./conf/lalserver.default.conf.json ./pkg/httpflv/testdata/lalserver.default.conf.json -fi -if [ ! -f "./pkg/hls/testdata/lalserver.default.conf.json" ]; then - cp ./conf/lalserver.default.conf.json ./pkg/hls/testdata/lalserver.default.conf.json -fi +cp ./conf/lalserver.default.conf.json ./pkg/logic/testdata/lalserver.default.conf.json +cp ./conf/lalserver.default.conf.json ./pkg/rtmp/testdata/lalserver.default.conf.json +cp ./conf/lalserver.default.conf.json ./pkg/httpflv/testdata/lalserver.default.conf.json +cp ./conf/lalserver.default.conf.json ./pkg/hls/testdata/lalserver.default.conf.json echo "" > coverage.txt for d in $(go list ./... | grep -v vendor | grep pkg | grep -v innertest); do