diff --git a/.gitignore b/.gitignore index a392ef43..244ed20d 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,6 @@ coverage.html /conf/self.conf.json /tmp -/demo /bin /release /.idea diff --git a/README.md b/README.md index 0164d1f8..c6d2330d 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ Go语言编写的直播流媒体网络传输服务器 --- -已支持RTMP,HTTP-FLV,H264/AVC,H265/HEVC,AAC,GOP缓存。正在实现HLS部分。 +已支持RTMP,HTTP-FLV,HLS(m3u8 + ts),H264/AVC,H265/HEVC,AAC,GOP缓存。 ### README 目录 @@ -84,6 +84,12 @@ $./bin/lals -c conf/lals.conf.json "sub_listen_addr": ":8080", // HTTP-FLV拉流地址 "gop_num": 2 }, + "hls": { + "sub_listen_addr": ":8081", // HLS监听地址 + "out_path": "/tmp/lal/hls/", // HLS文件保存根目录 + "fragment_duration_ms": 3000, // 单个TS文件切片时长 + "fragment_num": 6 // M3U8文件列表中TS文件的数量 + }, "log": { "level": 1, // 日志级别,1 debug, 2 info, 3 warn, 4 error, 5 fatal "filename": "./logs/lals.log", // 日志输出文件 @@ -105,37 +111,42 @@ $./bin/lals -c conf/lals.conf.json ### 三. 仓库目录框架 -简单来说,源码在`app/`和`pkg/`两个目录下,后续我再画些源码架构图。 +简单来说,源码在`pkg/`,`app/`,`demo/`三个目录下。 + +- `pkg/`:存放各package包,供本repo的程序以及其他业务方使用 +- `app/`:重要程序的入口(目前仅包含lals——基于lal编写的一个通用流媒体服务器程序) +- `demo/`:存放各种基于`lal/pkg`开发的小程序(小工具),一个子目录是一个程序,详情见各源码文件中头部的说明 ``` -pkg/ ......源码包 +pkg/ ...... |-- rtmp/ ......RTMP协议 |-- httpflv/ ......HTTP-FLV协议 -|-- logic/ ......lals服务器的上层业务 +|-- hls/ ......HLS协议 +|-- logic/ ......lals服务器程序的上层业务逻辑 |-- aac/ ......音频AAC编码格式相关 |-- avc/ ......视频H264/AVC编码格式相关 |-- hevc/ ......视频H265/HEVC编码格式相关 -app/ ......各种main包的源码文件,一个子目录对应一个main包,也即对应可生成一个可执行文件 -|-- lals/ ......[最重要的]流媒体服务器 -|-- flvfile2rtmppush ......// RTMP推流客户端,读取本地FLV文件,使用RTMP协议推送出去 - // - // 支持循环推送:文件推送完毕后,可循环推送(RTMP push流并不断开) - // 支持推送多路流:相当于一个RTMP推流压测工具 - -|-- rtmppull ......// RTMP拉流客户端,从远端服务器拉取RTMP流,存储为本地FLV文件 - // - // 另外,作为一个RTMP拉流压测工具,已经支持: - // 1. 对一路流拉取n份 - // 2. 拉取n路流 - -|-- httpflvpull ......HTTP-FLV拉流客户端 -|-- modflvfile ......修改本地FLV文件 -|-- flvfile2es ......将本地FLV文件分离成H264/AVC和AAC的ES流文件 +app/ ...... +|-- lals/ ......流媒体服务器lals的main函数入口 + +demo/ ...... +|-- analyseflvts ...... +|-- analysehlsts ...... +|-- flvfile2rtmppush ...... +|-- rtmppull ...... +|-- httpflvpull ...... +|-- modflvfile ...... +|-- flvfile2es ...... +|-- learnts ...... +|-- tscmp ...... + bin/ ......可执行文件编译输出目录 conf/ ......配置文件目录 ``` +后续我再画些源码架构图。 + 目前唯一的第三方依赖(我自己写的Go基础库): [github.com/q191201771/naza](https://github.com/q191201771/naza) @@ -151,16 +162,16 @@ conf/ ......配置文件目录 * 不依赖第三方代码 * 后续可快速集成各种网络传输协议,流媒体封装协议 -#### 功能 +#### lals服务器功能 + +- [x] **pub 接收推流:** RTMP +- [x] **sub 接收拉流:** RTMP,HTTP-FLV,HLS(m3u8+ts) +- [x] **音频编码格式:** AAC +- [x] **视频编码格式:** H264/AVC,H265/HEVC +- [x] **GOP缓存:** 用于秒开 + +TODO -- 接收RTMP推流 [DONE] -- 转发给RTMP拉流 [DONE] -- 转发给HTTP-FLV拉流 [DONE] -- AAC [DONE] -- H264/AVC [DONE] -- H265/HEVC [DONE] -- GOP缓存 [DONE] -- HLS - RTMP转推 - RTMP回源 - HTTP-FLV回源 diff --git a/app/httpflvpull/httpflvpull.go b/app/httpflvpull/httpflvpull.go deleted file mode 100644 index 1a05813e..00000000 --- a/app/httpflvpull/httpflvpull.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2019, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package main - -import ( - "encoding/hex" - "flag" - "os" - "time" - - "github.com/q191201771/lal/pkg/avc" - "github.com/q191201771/lal/pkg/hevc" - "github.com/q191201771/naza/pkg/bele" - - "github.com/q191201771/lal/pkg/httpflv" - "github.com/q191201771/naza/pkg/bitrate" - "github.com/q191201771/naza/pkg/nazaatomic" - log "github.com/q191201771/naza/pkg/nazalog" -) - -// TODO chef: 存储成 flv 文件 - -func main() { - url := parseFlag() - session := httpflv.NewPullSession() - abr := bitrate.New() - vbr := bitrate.New() - var runFlag nazaatomic.Bool - runFlag.Store(true) - go func() { - for runFlag.Load() { - time.Sleep(1 * time.Second) - } - }() - err := session.Pull(url, func(tag httpflv.Tag) { - switch tag.Header.Type { - case httpflv.TagTypeMetadata: - log.Info(hex.Dump(tag.Payload())) - case httpflv.TagTypeAudio: - abr.Add(len(tag.Raw)) - case httpflv.TagTypeVideo: - log.Infof("onReadFLVTag. %+v, isSeqHeader=%t, isKeyNalu=%t", tag.Header, tag.IsVideoKeySeqHeader(), tag.IsVideoKeyNalu()) - analysisVideoTag(tag) - vbr.Add(len(tag.Raw)) - } - }) - runFlag.Store(false) - log.Assert(nil, err) -} - -const ( - typeUnknown uint8 = 1 - typeAVC uint8 = 2 - typeHEVC uint8 = 3 -) - -var t uint8 = typeUnknown - -func analysisVideoTag(tag httpflv.Tag) { - if tag.IsVideoKeySeqHeader() { - if tag.IsAVCKeySeqHeader() { - t = typeAVC - log.Info("AVC SH") - } else if tag.IsHEVCKeySeqHeader() { - t = typeHEVC - log.Info("HEVC SH") - } - } else { - body := tag.Raw[11:] - - for i := 5; i != int(tag.Header.DataSize); { - naluLen := bele.BEUint32(body[i:]) - switch t { - case typeAVC: - log.Infof("%s %s", avc.CalcNaluTypeReadable(body[i+4:]), hex.Dump(body[i+4:i+8])) - case typeHEVC: - log.Infof("%s %s", hevc.CalcNaluTypeReadable(body[i+4:]), hex.Dump(body[i+4:i+8])) - } - i = i + 4 + int(naluLen) - } - } -} - -func parseFlag() string { - url := flag.String("i", "", "specify http-flv url") - flag.Parse() - if *url == "" { - flag.Usage() - os.Exit(1) - } - return *url -} diff --git a/build.sh b/build.sh index dc142439..23d3e513 100755 --- a/build.sh +++ b/build.sh @@ -32,6 +32,14 @@ do fi done +for file in `ls ${ROOT_DIR}/demo` +do + if [ -d ${ROOT_DIR}/demo/${file} ]; then + echo "build" ${ROOT_DIR}/demo/${file} "..." + cd ${ROOT_DIR}/demo/${file} && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/${OUT_DIR}/${file} + fi +done + for file in `ls ${ROOT_DIR}/playground` do if [ -d ${ROOT_DIR}/playground/${file} ]; then diff --git a/conf/lals.conf.json b/conf/lals.conf.json index 5ddb5f9b..693ee86b 100644 --- a/conf/lals.conf.json +++ b/conf/lals.conf.json @@ -7,6 +7,12 @@ "sub_listen_addr": ":8080", "gop_num": 2 }, + "hls": { + "sub_listen_addr": ":8081", + "out_path": "/tmp/lal/hls/", + "fragment_duration_ms": 3000, + "fragment_num": 6 + }, "log": { "level": 1, "filename": "./logs/lals.log", diff --git a/conf/lals.default.conf.json b/conf/lals.default.conf.json index 5ddb5f9b..693ee86b 100644 --- a/conf/lals.default.conf.json +++ b/conf/lals.default.conf.json @@ -7,6 +7,12 @@ "sub_listen_addr": ":8080", "gop_num": 2 }, + "hls": { + "sub_listen_addr": ":8081", + "out_path": "/tmp/lal/hls/", + "fragment_duration_ms": 3000, + "fragment_num": 6 + }, "log": { "level": 1, "filename": "./logs/lals.log", diff --git a/demo/analyseflv/analyseflv.go b/demo/analyseflv/analyseflv.go new file mode 100644 index 00000000..e2392bcc --- /dev/null +++ b/demo/analyseflv/analyseflv.go @@ -0,0 +1,186 @@ +// Copyright 2019, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "bytes" + "encoding/hex" + "flag" + "fmt" + "os" + "time" + + "github.com/q191201771/lal/pkg/rtmp" + + "github.com/q191201771/lal/pkg/avc" + "github.com/q191201771/lal/pkg/hevc" + "github.com/q191201771/naza/pkg/bele" + "github.com/q191201771/naza/pkg/bitrate" + + "github.com/q191201771/lal/pkg/httpflv" + "github.com/q191201771/naza/pkg/nazalog" +) + +// 分析诊断HTTP-FLV流的时间戳。注意,这个程序还没有完成。 +// 功能: +// - 时间戳回退检查 +// - 当音频时间戳出现回退时打error日志 +// - 当视频时间戳出现回退时打error日志 +// - 将音频和视频时间戳看成一个整体,出现回退时打error日志 +// - 定时打印: +// - 总体带宽 +// - 音频带宽 +// - 视频带宽 +// - 视频DTS和PTS不相等的计数 +// - H264 +// - 打印每个tag的类型:key seq header... +// - 打印每个tag中有多少个帧:SPS PPS SEI IDR SLICE... +// - 打印每个SLICE的类型:I、P、B... + +// TODO +// - 解析metadata +// - 检查时间戳正向大的跳跃 + +var ( + printStatFlag = true + printEveryTagFlag = false + printMetaData = true + analysisVideoTagFlag = false +) + +func main() { + url := parseFlag() + session := httpflv.NewPullSession() + + brTotal := bitrate.New() + brAudio := bitrate.New() + brVideo := bitrate.New() + + prevAudioTS := int64(-1) + prevVideoTS := int64(-1) + prevTS := int64(-1) + + videoCTSNotZeroCount := 0 + + go func() { + for { + time.Sleep(1 * time.Second) + if printStatFlag { + nazalog.Debugf("stat. total=%dKb/s, audio=%dKb/s, video=%dKb/s, videoCTSNotZeroCount=%d", int(brTotal.Rate()), int(brAudio.Rate()), int(brVideo.Rate()), videoCTSNotZeroCount) + } + } + }() + + err := session.Pull(url, func(tag httpflv.Tag) { + if printEveryTagFlag { + debugLength := 32 + if len(tag.Raw) < 32 { + debugLength = len(tag.Raw) + } + nazalog.Debugf("header=%+v, hex=%s", tag.Header, hex.Dump(tag.Raw[11:debugLength])) + } + + brTotal.Add(len(tag.Raw)) + + switch tag.Header.Type { + case httpflv.TagTypeMetadata: + //nazalog.Debugf("----------\n", hex.Dump(tag.Raw)) + if printMetaData { + // TODO chef: 这部分可以移入到rtmp package中 + _, l, err := rtmp.AMF0.ReadString(tag.Raw[11:]) + nazalog.Assert(nil, err) + kv, _, err := rtmp.AMF0.ReadObject(tag.Raw[11+l:]) + nazalog.Assert(nil, err) + var buf bytes.Buffer + buf.WriteString(fmt.Sprintf("-----\ncount:%d\n", len(kv))) + for k, v := range kv { + buf.WriteString(fmt.Sprintf(" %s: %v\n", k, v)) + } + nazalog.Debugf("%+v", buf.String()) + } + case httpflv.TagTypeAudio: + brAudio.Add(len(tag.Raw)) + + if prevAudioTS != -1 && int64(tag.Header.Timestamp) < prevAudioTS { + nazalog.Errorf("audio timestamp error. header=%+v, prevAudioTS=%d, diff=%d", tag.Header, prevAudioTS, int64(tag.Header.Timestamp)-prevAudioTS) + } + if prevTS != -1 && int64(tag.Header.Timestamp) < prevTS { + nazalog.Errorf("audio timestamp error. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS) + } + prevAudioTS = int64(tag.Header.Timestamp) + prevTS = int64(tag.Header.Timestamp) + case httpflv.TagTypeVideo: + if analysisVideoTagFlag { + analysisVideoTag(tag) + } + + videoCTS := bele.BEUint24(tag.Raw[13:]) + if videoCTS != 0 { + videoCTSNotZeroCount++ + } + + brVideo.Add(len(tag.Raw)) + + if prevVideoTS != -1 && int64(tag.Header.Timestamp) < prevVideoTS { + nazalog.Errorf("video timestamp error. header=%+v, prevVideoTS=%d, diff=%d", tag.Header, prevVideoTS, int64(tag.Header.Timestamp)-prevVideoTS) + } + if prevTS != -1 && int64(tag.Header.Timestamp) < prevTS { + nazalog.Errorf("video timestamp error. header=%+v, prevTS=%d, diff=%d", tag.Header, prevTS, int64(tag.Header.Timestamp)-prevTS) + } + prevVideoTS = int64(tag.Header.Timestamp) + prevTS = int64(tag.Header.Timestamp) + } + }) + nazalog.Warn(err) +} + +const ( + typeUnknown uint8 = 1 + typeAVC uint8 = 2 + typeHEVC uint8 = 3 +) + +var t uint8 = typeUnknown + +func analysisVideoTag(tag httpflv.Tag) { + var buf bytes.Buffer + if tag.IsVideoKeySeqHeader() { + if tag.IsAVCKeySeqHeader() { + t = typeAVC + buf.WriteString(" [AVC SeqHeader] ") + } else if tag.IsHEVCKeySeqHeader() { + t = typeHEVC + buf.WriteString(" [HEVC SeqHeader] ") + } + } else { + body := tag.Raw[11:] + + for i := 5; i != int(tag.Header.DataSize); { + naluLen := bele.BEUint32(body[i:]) + switch t { + case typeAVC: + buf.WriteString(fmt.Sprintf(" [%s(%s)] ", avc.CalcNaluTypeReadable(body[i+4:]), avc.CalcSliceTypeReadable(body[i+4:]))) + case typeHEVC: + buf.WriteString(fmt.Sprintf(" [%s] ", hevc.CalcNaluTypeReadable(body[i+4:]))) + } + i = i + 4 + int(naluLen) + } + } + nazalog.Debug(buf.String()) +} + +func parseFlag() string { + url := flag.String("i", "", "specify http-flv url") + flag.Parse() + if *url == "" { + flag.Usage() + os.Exit(1) + } + return *url +} diff --git a/demo/analysehls/analysehls.go b/demo/analysehls/analysehls.go new file mode 100644 index 00000000..69da4c9c --- /dev/null +++ b/demo/analysehls/analysehls.go @@ -0,0 +1,127 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "flag" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/q191201771/naza/pkg/lru" + "github.com/q191201771/naza/pkg/nazahttp" + "github.com/q191201771/naza/pkg/nazalog" +) + +// 分析诊断HLS的时间戳。注意,这个程序还没有完成。 +// +// TODO chef: 有的代码考虑弄到pkg/hls中 + +type M3U8PullSession struct { +} + +type frag struct { + extinf float64 + filename string +} + +func parseM3U8(content string) (ret []frag) { + var err error + + lines := strings.Split(content, "\n") + var f frag + for _, line := range lines { + if strings.HasPrefix(line, "#EXTINF:") { + line = strings.TrimPrefix(line, "#EXTINF:") + line = strings.TrimSuffix(line, ",") + f.extinf, err = strconv.ParseFloat(line, 64) + nazalog.Assert(nil, err) + } + if strings.Index(line, ".ts") != -1 { + f.filename = line + ret = append(ret, f) + } + } + return +} + +func getTSURL(m3u8URL string, tsFilename string) string { + index := strings.LastIndex(m3u8URL, "/") + nazalog.Assert(true, index != -1) + path := m3u8URL[:index+1] + return path + tsFilename +} + +func main() { + m3u8URL := parseFlag() + nazalog.Infof("m3u8 url=%s", m3u8URL) + + cache := lru.New(1024) + + var m sync.Mutex + var frags []frag + + go func() { + for { + content, err := nazahttp.GetHttpFile(m3u8URL, 3000) + if err != nil { + nazalog.Error(err) + return + } + //nazalog.Debugf("\n-----m3u8-----\n%s", string(content)) + + currFrags := parseM3U8(string(content)) + //nazalog.Debugf("%+v", currFrags) + + m.Lock() + for _, f := range currFrags { + if _, exist := cache.Get(f.filename); exist { + continue + } + cache.Put(f.filename, nil) + + nazalog.Infof("> new frag. filename=%s", f.filename) + frags = append(frags, f) + } + m.Unlock() + + time.Sleep(100 * time.Millisecond) + } + }() + + for { + m.Lock() + currFrags := frags + frags = nil + m.Unlock() + + for _, f := range currFrags { + nazalog.Infof("< new frag. filename=%s", f.filename) + tsURL := getTSURL(m3u8URL, f.filename) + nazalog.Debug(tsURL) + content, err := nazahttp.GetHttpFile(tsURL, 3000) + nazalog.Assert(nil, err) + nazalog.Debugf("TS len=%d", len(content)) + } + + time.Sleep(100 * time.Millisecond) + } +} + +func parseFlag() string { + url := flag.String("i", "", "specify m3u8 url") + flag.Parse() + if *url == "" { + flag.Usage() + os.Exit(1) + } + return *url +} diff --git a/app/flvfile2es/flvfile2es.go b/demo/flvfile2es/flvfile2es.go similarity index 99% rename from app/flvfile2es/flvfile2es.go rename to demo/flvfile2es/flvfile2es.go index 3e8a0984..d4cc3d45 100644 --- a/app/flvfile2es/flvfile2es.go +++ b/demo/flvfile2es/flvfile2es.go @@ -21,6 +21,7 @@ import ( ) // 将本地FLV文件分离成H264/AVC和AAC的ES流文件 +// // TODO chef 做HEVC的支持 func main() { diff --git a/app/flvfile2rtmppush/flvfile2rtmppush.go b/demo/flvfile2rtmppush/flvfile2rtmppush.go similarity index 100% rename from app/flvfile2rtmppush/flvfile2rtmppush.go rename to demo/flvfile2rtmppush/flvfile2rtmppush.go diff --git a/demo/httpflvpull/httpflvpull.go b/demo/httpflvpull/httpflvpull.go new file mode 100644 index 00000000..a114063b --- /dev/null +++ b/demo/httpflvpull/httpflvpull.go @@ -0,0 +1,48 @@ +// Copyright 2019, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package main + +import ( + "encoding/hex" + "flag" + "os" + + "github.com/q191201771/lal/pkg/httpflv" + log "github.com/q191201771/naza/pkg/nazalog" +) + +// 拉取HTTP-FLV的流 +// +// TODO +// - 存储成flv文件 +// - 拉取HTTP-FLV流进行分析参见另外一个demo:analyseflvts。 这个demo可能可以删除掉了。 + +func main() { + url := parseFlag() + session := httpflv.NewPullSession() + err := session.Pull(url, func(tag httpflv.Tag) { + switch tag.Header.Type { + case httpflv.TagTypeMetadata: + log.Info(hex.Dump(tag.Payload())) + case httpflv.TagTypeAudio: + case httpflv.TagTypeVideo: + } + }) + log.Assert(nil, err) +} + +func parseFlag() string { + url := flag.String("i", "", "specify http-flv url") + flag.Parse() + if *url == "" { + flag.Usage() + os.Exit(1) + } + return *url +} diff --git a/app/learnts/learnts.go b/demo/learnts/learnts.go similarity index 93% rename from app/learnts/learnts.go rename to demo/learnts/learnts.go index 831144b2..511cae20 100644 --- a/app/learnts/learnts.go +++ b/demo/learnts/learnts.go @@ -15,7 +15,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -// 解析TS文件,注意,该程序还没有写完 +// 学习如何解析TS文件。注意,该程序还没有写完。 var ( pat hls.PAT @@ -85,7 +85,7 @@ func main() { content, err := ioutil.ReadFile(filename) nazalog.Assert(nil, err) - packets := hls.SplitTS(content) + packets := hls.SplitFragment2TSPackets(content) for _, packet := range packets { handlePacket(packet) diff --git a/app/modflvfile/modflvfile.go b/demo/modflvfile/modflvfile.go similarity index 100% rename from app/modflvfile/modflvfile.go rename to demo/modflvfile/modflvfile.go diff --git a/app/rtmppull/rtmppull.go b/demo/rtmppull/rtmppull.go similarity index 100% rename from app/rtmppull/rtmppull.go rename to demo/rtmppull/rtmppull.go diff --git a/app/tscmp/tscmp.go b/demo/tscmp/tscmp.go similarity index 91% rename from app/tscmp/tscmp.go rename to demo/tscmp/tscmp.go index 1f967e22..1eadcac9 100644 --- a/app/tscmp/tscmp.go +++ b/demo/tscmp/tscmp.go @@ -17,7 +17,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -// 比较两个TS文件,注意,该程序还没有写完 +// 临时小工具,比较两个TS文件。注意,该程序还没有写完。 var filename1 = "/Volumes/Data/tmp/lal-4.ts" var filename2 = "/Volumes/Data/tmp/nrm-4.ts" @@ -63,8 +63,8 @@ func main() { content2, err := ioutil.ReadFile(filename2) nazalog.Assert(nil, err) - tss1 := hls.SplitTS(content1) - tss2 := hls.SplitTS(content2) + tss1 := hls.SplitFragment2TSPackets(content1) + tss2 := hls.SplitFragment2TSPackets(content2) nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2)) diff --git a/go.mod b/go.mod index e8f9204a..750be5a8 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.12 -require github.com/q191201771/naza v0.12.2 +require github.com/q191201771/naza v0.12.3 diff --git a/go.sum b/go.sum index 044e43a8..e056e9bf 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.12.2 h1:El5OSCPFrGGrZiyZ0aOvdystC15Ap7lC4MipVKdfVMY= -github.com/q191201771/naza v0.12.2/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= +github.com/q191201771/naza v0.12.3 h1:0Z8hMa5RYNqsG1GjGfYyLFkuPLfuZ21iDx3BJEPK0p8= +github.com/q191201771/naza v0.12.3/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= diff --git a/pkg/avc/avc.go b/pkg/avc/avc.go index cb2547c8..f1f84f6c 100644 --- a/pkg/avc/avc.go +++ b/pkg/avc/avc.go @@ -13,9 +13,8 @@ import ( "io" "math" - "github.com/q191201771/naza/pkg/nazabits" - "github.com/q191201771/naza/pkg/bele" + "github.com/q191201771/naza/pkg/nazabits" ) var ErrAVC = errors.New("lal.avc: fxxk") @@ -61,42 +60,52 @@ const ( SliceTypeSI uint8 = 4 // TODO chef ) +func CalcNaluType(nalu []byte) uint8 { + return nalu[0] & 0x1f +} + func CalcSliceType(nalu []byte) uint8 { c := nalu[1] - var leadingZeroBits uint - index := uint(6) + var leadingZeroBits int + index := 6 // can't unsigned for ; index >= 0; index-- { - v := nazabits.GetBit8(c, index) + v := nazabits.GetBit8(c, uint(index)) if v == 0 { leadingZeroBits++ } else { break } } - rbLeadingZeroBits := nazabits.GetBits8(c, index-1, leadingZeroBits) - codeNum := uint(math.Pow(2, float64(leadingZeroBits))) - 1 + uint(rbLeadingZeroBits) + rbLeadingZeroBits := nazabits.GetBits8(c, uint(index-1), uint(leadingZeroBits)) + codeNum := int(math.Pow(2, float64(leadingZeroBits))) - 1 + int(rbLeadingZeroBits) if codeNum > 4 { codeNum -= 5 } return uint8(codeNum) } -func CalcSliceTypeReadable(nalu []byte) string { - t := CalcSliceType(nalu) - ret, ok := SliceTypeMapping[t] +func CalcNaluTypeReadable(nalu []byte) string { + t := nalu[0] & 0x1f + ret, ok := NaluUintTypeMapping[t] if !ok { return "unknown" } return ret } -func CalcNaluType(nalu []byte) uint8 { - return nalu[0] & 0x1f -} +func CalcSliceTypeReadable(nalu []byte) string { + naluType := CalcNaluType(nalu) + switch naluType { + case NaluUnitTypeSEI: + fallthrough + case NaluUintTypeSPS: + fallthrough + case NaluUintTypePPS: + return "" + } -func CalcNaluTypeReadable(nalu []byte) string { - t := nalu[0] & 0x1f - ret, ok := NaluUintTypeMapping[t] + t := CalcSliceType(nalu) + ret, ok := SliceTypeMapping[t] if !ok { return "unknown" } diff --git a/pkg/hls/mpegts.go b/pkg/hls/fragment.go similarity index 89% rename from pkg/hls/mpegts.go rename to pkg/hls/fragment.go index bf816131..d72b50fb 100644 --- a/pkg/hls/mpegts.go +++ b/pkg/hls/fragment.go @@ -14,25 +14,29 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) -// TODO chef: 这个文件需要和session.go一起重构 +type FragmentOP struct { + fp *os.File +} -type MPEGTSFrame struct { +type mpegTSFrame struct { pts uint64 dts uint64 pid uint16 sid uint8 cc uint8 - key bool + key bool // 关键帧 } -func mpegtsOpenFile(filename string) *os.File { - fp, err := os.Create(filename) - nazalog.Assert(nil, err) - mpegtsWriteFile(fp, FixedFragmentHeader) - return fp +func (f *FragmentOP) OpenFile(filename string) (err error) { + f.fp, err = os.Create(filename) + if err != nil { + return + } + f.writeFile(FixedFragmentHeader) + return nil } -func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { +func (f *FragmentOP) WriteFrame(frame *mpegTSFrame, b []byte) { //nazalog.Debugf("mpegts: pid=%d, sid=%d, pts=%d, dts=%d, key=%b, size=%d", frame.pid, frame.sid, frame.pts, frame.dts, frame.key, len(b)) wpos := 0 // 当前packet的写入位置 @@ -47,7 +51,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { frame.cc++ // 每个packet都需要添加TS Header - // -----TS Header----- + // -----TS Header---------------- // sync_byte // transport_error_indicator 0 // payload_unit_start_indicator @@ -56,6 +60,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { // transport_scrambling_control 0 // adaptation_field_control // continuity_counter + // ------------------------------ packet[0] = syncByte // sync_byte if first { @@ -72,7 +77,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { if first { if frame.key { // 关键帧的首个packet需要添加Adaptation - // -----Adaptation----- + // -----Adaptation----------------------- // adaptation_field_length // discontinuity_indicator 0 // random_access_indicator 1 @@ -85,6 +90,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { // program_clock_reference_base // reserved // program_clock_reference_extension + // -------------------------------------- packet[3] |= 0x20 // adaptation_field_control 设置Adaptation packet[4] = 7 // adaptation_field_length packet[5] = 0x50 // random_access_indicator + PCR_flag @@ -93,7 +99,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { } // 帧的首个packet需要添加PES Header - // -----PES Header----- + // -----PES Header------------ // packet_start_code_prefix // stream_id // PES_packet_length @@ -111,6 +117,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { // PES_CRC_flag 0 // PES_extension_flag 0 // PES_header_data_length + // --------------------------- packet[wpos] = 0x00 // packet_start_code_prefix packet[wpos+1] = 0x00 // packet[wpos+2] = 0x01 // @@ -147,7 +154,6 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { if frame.pts != frame.dts { mpegtsWritePTS(packet[wpos:], 1, frame.dts+delay) wpos += 5 - //nazalog.Debugf("%d %d", (frame.pts)/90, (frame.dts)/90) } first = false @@ -212,10 +218,18 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { lpos = rpos } - mpegtsWriteFile(fp, packet) + f.writeFile(packet) } } +func (f *FragmentOP) CloseFile() { + _ = f.fp.Close() +} + +func (f *FragmentOP) writeFile(b []byte) { + _, _ = f.fp.Write(b) +} + func mpegtsdWritePCR(out []byte, pcr uint64) { out[0] = uint8(pcr >> 25) out[1] = uint8(pcr >> 17) @@ -238,11 +252,3 @@ func mpegtsWritePTS(out []byte, fb uint8, pts uint64) { out[3] = uint8(val >> 8) out[4] = uint8(val) } - -func mpegtsWriteFile(fp *os.File, b []byte) { - _, _ = fp.Write(b) -} - -func mpegtsCloseFile(fp *os.File) { - _ = fp.Close() -} diff --git a/pkg/hls/hls.go b/pkg/hls/hls.go index a789ecea..bd65d99f 100644 --- a/pkg/hls/hls.go +++ b/pkg/hls/hls.go @@ -9,27 +9,27 @@ package hls // TODO -// package hls处于开发中阶段,请不要使用,第一步计划 -// - 不提供各种配置项 -// - 只支持H264和AAC -// - 先参照nginx rtmp module把功能实现,再做重构 -// +// - 支持HEVC // - 检查所有的容错处理,是否会出现 +// - 补充单元测试 // - 配置项 -// - web服务 -// - 清理文件 +// - Server +// - Dispose +// - 超时时间 +// - 测试windows平台 +// - safari直接播放不了,vlc和ffplay是可以的 // https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist // https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction // #EXTM3U // 固定串 // #EXT-X-VERSION:3 // 固定串 -// #EXT-X-MEDIA-SEQUENCE // m3u8文件中,第一个TS文件的序号 +// #EXT-X-MEDIA-SEQUENCE // // #EXT-X-TARGETDURATION // 所有TS文件,最长的时长 // #EXT-X-PLAYLIST-TYPE: EVENT // #EXT-X-DISCONTINUITY // -// #EXTINF: // 时长 以及TS文件名 +// #EXTINF: // 时长以及TS文件名 -// 重构时,需要统一项目中数据的命名,比如,进来的数据称为Frame帧,188字节的封装称为TSPacket包,TS文件称为Fragment +// 进来的数据称为Frame帧,188字节的封装称为TSPacket包,TS文件称为Fragment // 每个TS文件都以固定的PAT,PMT开始 var FixedFragmentHeader = []byte{ @@ -154,13 +154,23 @@ const ( delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用? // TODO chef 这些在配置项中提供 - outPath = "/tmp/lal/hls/" // 切片文件输出目录 - fraglen = 5000 // 单个TS时长,单位毫秒 - playlen = 30000 // m3u8列表时长 - maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长,强制切割新的fragment,单位毫秒 * 90 - negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位毫秒 * 90 - winfrags = playlen / fraglen // 多少个TS文件 - maxAudioDelay uint64 = 300 // 单位毫秒 - audioBufSize = 1024 * 1024 - Sync = 2 + negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位毫秒 * 90 + maxAudioDelay uint64 = 300 // 单位毫秒 + + appName = "hls" ) + +func SplitFragment2TSPackets(content []byte) (ret [][]byte) { + if len(content)%188 != 0 { + return + } + for { + if len(content) == 0 { + break + } + + ret = append(ret, content[0:188]) + content = content[188:] + } + return +} diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go new file mode 100644 index 00000000..553c1618 --- /dev/null +++ b/pkg/hls/muxer.go @@ -0,0 +1,405 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +import ( + "bytes" + "fmt" + "os" + + "github.com/q191201771/lal/pkg/aac" + + "github.com/q191201771/lal/pkg/rtmp" + "github.com/q191201771/naza/pkg/bele" + "github.com/q191201771/naza/pkg/nazalog" +) + +// 记录fragment的一些信息,注意,写m3u8文件时可能还需要用到历史fragment的信息 +type fragmentInfo struct { + id int // fragment的自增序号 + duration float64 // 当前fragment中数据的时长,单位秒 + discont bool // #EXT-X-DISCONTINUITY +} + +type MuxerConfig struct { + OutPath string `json:"out_path"` + FragmentDurationMS int `json:"fragment_duration_ms"` + FragmentNum int `json:"fragment_num"` +} + +type Muxer struct { + streamName string + outPath string + playlistFilename string + playlistFilenameBak string + + config *MuxerConfig + + fragmentOP FragmentOP + opened bool + adts aac.ADTS + spspps []byte + videoCC uint8 + audioCC uint8 + videoOut []byte // 帧 + + fragTS uint64 // 新建立fragment时的时间戳,毫秒 * 90 + + nfrags int // 大序号,增长到winfrags后,就增长frag + frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 + frags []fragmentInfo // TS文件的环形队列,记录TS的信息,比如写M3U8文件时要用 2 * winfrags + 1 + + aaframe []byte + aframePTS uint64 // 最新音频帧的时间戳 +} + +func NewMuxer(streamName string, config *MuxerConfig) *Muxer { + op := getMuxerOutPath(config.OutPath, streamName) + playlistFilename := getM3U8Filename(op, streamName) + playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) + videoOut := make([]byte, 1024*1024) + videoOut = videoOut[0:0] + frags := make([]fragmentInfo, 2*config.FragmentNum+1) // TODO chef: 为什么是 * 2 + 1 + return &Muxer{ + streamName: streamName, + outPath: op, + playlistFilename: playlistFilename, + playlistFilenameBak: playlistFilenameBak, + config: config, + videoOut: videoOut, + aaframe: nil, + frags: frags, + } +} + +func (m *Muxer) Start() { + nazalog.Infof("start hls muxer. streamName=%s", m.streamName) + m.ensureDir() +} + +func (m *Muxer) Stop() { + nazalog.Infof("stop hls muxer. streamName=%s", m.streamName) + m.flushAudio() + m.closeFragment() +} + +func (m *Muxer) FeedRTMPMessage(msg rtmp.AVMsg) { + switch msg.Header.MsgTypeID { + case rtmp.TypeidAudio: + m.feedAudio(msg) + case rtmp.TypeidVideo: + m.feedVideo(msg) + } +} + +func (m *Muxer) feedVideo(msg rtmp.AVMsg) { + if msg.Payload[0]&0xF != 7 { + // TODO chef: HLS视频现在只做了h264的支持 + return + } + + ftype := msg.Payload[0] & 0xF0 >> 4 + htype := msg.Payload[1] + + if ftype == 1 && htype == 0 { + m.cacheSPSPPS(msg) + return + } + + cts := bele.BEUint24(msg.Payload[2:]) + + audSent := false + spsppsSent := false + // 优化这块buffer + out := m.videoOut[0:0] + for i := 5; i != len(msg.Payload); { + nalBytes := int(bele.BEUint32(msg.Payload[i:])) + i += 4 + srcNalType := msg.Payload[i] + nalType := srcNalType & 0x1F + + //nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts) + + if nalType >= 7 && nalType <= 9 { + nazalog.Warn("should not reach here.") + i += nalBytes + continue + } + + if !audSent { + switch nalType { + case 1, 5, 6: + out = append(out, audNal...) + audSent = true + case 9: + audSent = true + } + } + + switch nalType { + case 1: + spsppsSent = false + case 5: + if !spsppsSent { + out = m.appendSPSPPS(out) + } + spsppsSent = true + + } + + if len(out) == 0 { + out = append(out, nalStartCode...) + } else { + out = append(out, nalStartCode3...) + } + out = append(out, msg.Payload[i:i+nalBytes]...) + + i += nalBytes + } + + var frame mpegTSFrame + frame.cc = m.videoCC + frame.dts = uint64(msg.Header.TimestampAbs) * 90 + frame.pts = frame.dts + uint64(cts)*90 + frame.pid = PidVideo + frame.sid = streamIDVideo + frame.key = ftype == 1 + + boundary := frame.key && (!m.opened || m.adts.IsNil() || m.aaframe != nil) + + m.updateFragment(frame.dts, boundary, 1) + + if !m.opened { + nazalog.Warn("not opened.") + return + } + + m.fragmentOP.WriteFrame(&frame, out) + m.videoCC = frame.cc +} + +func (m *Muxer) feedAudio(msg rtmp.AVMsg) { + if msg.Payload[0]>>4 != 10 { + // TODO chef: HLS音频现在只做了h264的支持 + return + } + + if msg.Payload[1] == 0 { + m.cacheAACSeqHeader(msg) + return + } + + pts := uint64(msg.Header.TimestampAbs) * 90 + + m.updateFragment(pts, m.spspps == nil, 2) + + if m.aaframe == nil { + m.aframePTS = pts + } + + adtsHeader := m.adts.GetADTS(uint16(msg.Header.MsgLen)) + m.aaframe = append(m.aaframe, adtsHeader...) + m.aaframe = append(m.aaframe, msg.Payload[2:]...) + +} + +func (m *Muxer) cacheAACSeqHeader(msg rtmp.AVMsg) { + m.adts.PutAACSequenceHeader(msg.Payload) +} + +func (m *Muxer) cacheSPSPPS(msg rtmp.AVMsg) { + m.spspps = msg.Payload +} + +func (m *Muxer) appendSPSPPS(out []byte) []byte { + index := 10 + nnals := m.spspps[index] & 0x1f + index++ + for n := 0; ; n++ { + for ; nnals != 0; nnals-- { + length := int(bele.BEUint16(m.spspps[index:])) + index += 2 + out = append(out, nalStartCode...) + out = append(out, m.spspps[index:index+length]...) + index += length + } + + if n == 1 { + break + } + nnals = m.spspps[index] + index++ + } + return out +} + +func (m *Muxer) updateFragment(ts uint64, boundary bool, flushRate int) { + force := false + discont := true + var f *fragmentInfo + + if m.opened { + f = m.getFrag(m.nfrags) + + // 当前时间戳跳跃很大,或者是往回跳跃超过了阈值,强制开启新的fragment + maxfraglen := uint64(m.config.FragmentDurationMS * 90 * 10) + if (ts > m.fragTS && ts-m.fragTS > maxfraglen) || (m.fragTS > ts && m.fragTS-ts > negMaxfraglen) { + nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", m.fragTS, ts) + force = true + } else { + // TODO chef: 考虑ts比fragTS小的情况 + f.duration = float64(ts-m.fragTS) / 90000 + discont = false + } + } + + // 时长超过设置的ts文件切片阈值才行 + if f != nil && f.duration < float64(m.config.FragmentDurationMS)/1000 { + boundary = false + } + + // 开启新的fragment + if boundary || force { + m.closeFragment() + m.openFragment(ts, discont) + } + + // 音频已经缓存了一定时长的数据了,需要落盘了 + if m.opened && m.aaframe != nil && ((m.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) { + m.flushAudio() + } +} + +func (m *Muxer) openFragment(ts uint64, discont bool) { + if m.opened { + return + } + + id := m.getFragmentID() + + filename := getTSFilename(m.outPath, m.streamName, id) + m.fragmentOP.OpenFile(filename) + m.opened = true + + frag := m.getFrag(m.nfrags) + frag.discont = discont + frag.id = id + + m.fragTS = ts + + m.flushAudio() +} + +func (m *Muxer) closeFragment() { + if !m.opened { + return + } + + m.fragmentOP.CloseFile() + + m.opened = false + + m.nextFrag() + + m.writePlaylist() +} + +func (m *Muxer) writePlaylist() { + fp, err := os.Create(m.playlistFilenameBak) + nazalog.Assert(nil, err) + + // 找出时长最长的fragment + maxFrag := float64(m.config.FragmentDurationMS) / 1000 + for i := 0; i < m.nfrags; i++ { + frag := m.getFrag(i) + if frag.duration > maxFrag { + maxFrag = frag.duration + 0.5 + } + } + + // TODO chef 优化这块buffer的构造 + var buf bytes.Buffer + buf.WriteString("#EXTM3U\n") + buf.WriteString("#EXT-X-VERSION:3\n") + buf.WriteString("#EXT-X-ALLOW-CACHE:NO\n") + buf.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(maxFrag))) + buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n\n", m.frag)) + + for i := 0; i < m.nfrags; i++ { + frag := m.getFrag(i) + + if frag.discont { + buf.WriteString("#EXT-X-DISCONTINUITY\n") + } + + buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, getTSFilenameWithoutPath(m.streamName, frag.id))) + } + + _, err = fp.Write(buf.Bytes()) + nazalog.Assert(nil, err) + _ = fp.Close() + err = os.Rename(m.playlistFilenameBak, m.playlistFilename) + nazalog.Assert(nil, err) +} + +// 创建文件夹,如果文件夹已经存在,老的文件夹会被删除 +func (m *Muxer) ensureDir() { + err := os.RemoveAll(m.outPath) + nazalog.Assert(nil, err) + err = os.MkdirAll(m.outPath, 0777) + nazalog.Assert(nil, err) +} + +func (m *Muxer) getFragmentID() int { + return m.frag + m.nfrags +} + +func (m *Muxer) getFrag(n int) *fragmentInfo { + return &m.frags[(m.frag+n)%(m.config.FragmentNum*2+1)] +} + +// TODO chef: 这个函数重命名为incr更好些 +func (m *Muxer) nextFrag() { + if m.nfrags == m.config.FragmentNum { + m.frag++ + } else { + m.nfrags++ + } +} + +// 将音频数据落盘的几种情况: +// 1. open fragment时,如果aframe中还有数据 +// 2. update fragment时,判断音频的时间戳 +// 3. 音频队列长度过长时 +// 4. 流关闭时 +func (m *Muxer) flushAudio() { + if !m.opened { + nazalog.Warn("flushAudio by not opened.") + return + } + + if m.aaframe == nil { + nazalog.Warn("flushAudio by aframe is nil.") + return + } + + frame := &mpegTSFrame{ + pts: m.aframePTS, + dts: m.aframePTS, + pid: PidAudio, + sid: streamIDAudio, + cc: m.audioCC, + key: false, + } + + m.fragmentOP.WriteFrame(frame, m.aaframe) + + m.audioCC = frame.cc + m.aaframe = nil +} diff --git a/pkg/hls/path.go b/pkg/hls/path.go new file mode 100644 index 00000000..ade641e9 --- /dev/null +++ b/pkg/hls/path.go @@ -0,0 +1,77 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +import ( + "fmt" + "io/ioutil" + "strings" +) + +// 本文件聚合以下功能: +// - 生成HLS(m3u8文件+ts文件)时,文件命名规则,以及文件存放规则 +// - HTTP请求HLS时,request URI和文件路径的映射规则 + +// HTTP请求URI格式,已经文件路径的映射规则 +// +// 假设 +// 流名称="test110" +// rootPath="/tmp/lal/hls/" +// +// 则 +// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> /tmp/lal/hls/test110/playlist.m3u8 +// http://127.0.0.1:8081/hls/test110/test110-0.ts -> /tmp/lal/hls/test110/test110-0.ts + +type requestInfo struct { + fileName string + streamName string + fileType string +} + +// RequestURI example: +// uri -> fileName streamName fileType +// http://127.0.0.1:8081/hls/test110/playlist.m3u8 -> playlist.m3u8 test110 m3u8 +// http://127.0.0.1:8081/hls/test110/test110-0.ts -> test110-0.ts test110 ts +func parseRequestInfo(uri string) (ri requestInfo) { + ss := strings.Split(uri, "/") + if len(ss) < 2 { + return + } + ri.streamName = ss[len(ss)-2] + ri.fileName = ss[len(ss)-1] + + ss = strings.Split(ri.fileName, ".") + if len(ss) < 2 { + return + } + ri.fileType = ss[len(ss)-1] + + return +} + +func readFileContent(rootOutPath string, ri requestInfo) ([]byte, error) { + filename := fmt.Sprintf("%s%s/%s", rootOutPath, ri.streamName, ri.fileName) + return ioutil.ReadFile(filename) +} + +func getMuxerOutPath(rootOutPath string, streamName string) string { + return fmt.Sprintf("%s%s/", rootOutPath, streamName) +} + +func getM3U8Filename(outpath string, streamName string) string { + return fmt.Sprintf("%s%s.m3u8", outpath, "playlist") +} + +func getTSFilename(outpath string, streamName string, id int) string { + return fmt.Sprintf("%s%s-%d.ts", outpath, streamName, id) +} + +func getTSFilenameWithoutPath(streamName string, id int) string { + return fmt.Sprintf("%s-%d.ts", streamName, id) +} diff --git a/pkg/hls/server.go b/pkg/hls/server.go new file mode 100644 index 00000000..c0be7863 --- /dev/null +++ b/pkg/hls/server.go @@ -0,0 +1,72 @@ +// Copyright 2020, Chef. All rights reserved. +// https://github.com/q191201771/lal +// +// Use of this source code is governed by a MIT-style license +// that can be found in the License file. +// +// Author: Chef (191201771@qq.com) + +package hls + +import ( + "net/http" + + "github.com/q191201771/naza/pkg/nazalog" +) + +type Server struct { + addr string + outPath string +} + +func NewServer(addr string, outPath string) *Server { + return &Server{ + addr: addr, + outPath: outPath, + } +} + +func (s *Server) RunLoop() error { + nazalog.Infof("start hls listen. addr=%s", s.addr) + return http.ListenAndServe(s.addr, s) +} + +func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + //nazalog.Debugf("%+v", req) + + // TODO chef: + // - check appname in URI path + // - DIY 404 response body + + ri := parseRequestInfo(req.RequestURI) + //nazalog.Debugf("%+v", ri) + + if ri.fileName == "" || ri.streamName == "" || (ri.fileType != "m3u8" && ri.fileType != "ts") { + nazalog.Warnf("%+v", ri) + resp.WriteHeader(404) + return + } + + content, err := readFileContent(s.outPath, ri) + if err != nil { + nazalog.Warnf("%+v", err) + resp.WriteHeader(404) + return + } + + switch ri.fileType { + case "m3u8": + resp.Header().Add("Content-Type", "application/x-mpegurl") + //resp.Header().Add("Content-Type", "application/vnd.apple.mpegurl") + case "ts": + resp.Header().Add("Content-Type", "video/mp2t") + } + resp.Header().Add("Cache-Control", "no-cache") + //resp.Header().Add("Access-Control-Allow-Origin", "*") + //resp.Header().Add("Access-Control-Allow-Credentials", "true") + //resp.Header().Add("Access-Control-Allow-Methods", "*") + //resp.Header().Add("Access-Control-Allow-Headers", "Content-Type,Access-Token") + //resp.Header().Add("Access-Control-Allow-Expose-Headers", "*") + _, _ = resp.Write(content) + return +} diff --git a/pkg/hls/session.go b/pkg/hls/session.go deleted file mode 100644 index 4db58f3a..00000000 --- a/pkg/hls/session.go +++ /dev/null @@ -1,405 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package hls - -import ( - "bytes" - "encoding/hex" - "fmt" - "os" - - "github.com/q191201771/lal/pkg/aac" - - "github.com/q191201771/lal/pkg/rtmp" - "github.com/q191201771/naza/pkg/bele" - "github.com/q191201771/naza/pkg/nazalog" -) - -type Frag struct { - id uint64 - keyID uint64 - duration float64 // 当前fragment中数据的时长,单位秒 - active bool - discont bool // #EXT-X-DISCONTINUITY -} - -type Session struct { - streamName string - playlistFilename string - playlistFilenameBak string - - adts aac.ADTS - //aacSeqHeader []byte - spspps []byte - videoCC uint8 - audioCC uint8 - opened bool - videoOut []byte // 帧 - fp *os.File - - fragTS uint64 // 新建立fragment时的时间戳,毫秒 * 90 - - nfrags int // 大序号,增长到winfrags后,就增长frag - frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 - frags []Frag // TS文件的环形队列,记录TS的信息,比如写M3U8文件时要用 2 * winfrags + 1 - - aaframe []byte - //aframeBase uint64 // 上一个音频帧的时间戳 - //aframeNum uint64 - aframePTS uint64 // 最新音频帧的时间戳 -} - -func NewSession(streamName string) *Session { - playlistFilename := fmt.Sprintf("%s%s.m3u8", outPath, streamName) - playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) - videoOut := make([]byte, 1024*1024) - videoOut = videoOut[0:0] - frags := make([]Frag, 2*winfrags+1) // TODO chef: 为什么是 * 2 + 1 - return &Session{ - videoOut: videoOut, - aaframe: nil, - frags: frags, - streamName: streamName, - playlistFilename: playlistFilename, - playlistFilenameBak: playlistFilenameBak, - } -} - -func (s *Session) Start() { - -} - -func (s *Session) Stop() { - s.flushAudio() - s.closeFragment() -} - -func (s *Session) FeedRTMPMessage(msg rtmp.AVMsg) { - // TODO chef: to be continued - // HLS还没有开发完 - return - switch msg.Header.MsgTypeID { - case rtmp.TypeidAudio: - s.feedAudio(msg) - case rtmp.TypeidVideo: - s.feedVideo(msg) - } -} - -func (s *Session) feedVideo(msg rtmp.AVMsg) { - if msg.Payload[0]&0xF != 7 { - // TODO chef: HLS视频现在只做了h264的支持 - return - } - - ftype := msg.Payload[0] & 0xF0 >> 4 - htype := msg.Payload[1] - - if ftype == 1 && htype == 0 { - s.cacheSPSPPS(msg) - return - } - - cts := bele.BEUint24(msg.Payload[2:]) - - audSent := false - spsppsSent := false - // 优化这块buffer - out := s.videoOut[0:0] - for i := 5; i != len(msg.Payload); { - nalBytes := int(bele.BEUint32(msg.Payload[i:])) - i += 4 - srcNalType := msg.Payload[i] - nalType := srcNalType & 0x1F - - //nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts) - - if nalType >= 7 && nalType <= 9 { - nazalog.Warn("should not reach here.") - i += nalBytes - continue - } - - if !audSent { - switch nalType { - case 1, 5, 6: - out = append(out, audNal...) - audSent = true - case 9: - audSent = true - } - } - - switch nalType { - case 1: - spsppsSent = false - case 5: - if !spsppsSent { - out = s.appendSPSPPS(out) - } - spsppsSent = true - - } - - if len(out) == 0 { - out = append(out, nalStartCode...) - } else { - out = append(out, nalStartCode3...) - } - out = append(out, msg.Payload[i:i+nalBytes]...) - - i += nalBytes - } - - var frame MPEGTSFrame - frame.cc = s.videoCC - frame.dts = uint64(msg.Header.TimestampAbs) * 90 - frame.pts = frame.dts + uint64(cts)*90 - frame.pid = PidVideo - frame.sid = streamIDVideo - frame.key = ftype == 1 - - boundary := frame.key && (!s.opened || s.adts.IsNil() || s.aaframe != nil) - - s.updateFragment(frame.dts, boundary, 1) - - if !s.opened { - nazalog.Warn("not opened.") - return - } - - mpegtsWriteFrame(s.fp, &frame, out) - s.videoCC = frame.cc -} - -func (s *Session) feedAudio(msg rtmp.AVMsg) { - if msg.Payload[0]>>4 != 10 { - // TODO chef: HLS音频现在只做了h264的支持 - return - } - - if msg.Payload[1] == 0 { - s.cacheAACSeqHeader(msg) - return - } - - pts := uint64(msg.Header.TimestampAbs) * 90 - - s.updateFragment(pts, s.spspps == nil, 2) - - if s.aaframe == nil { - s.aframePTS = pts - } - - adtsHeader := s.adts.GetADTS(uint16(msg.Header.MsgLen)) - s.aaframe = append(s.aaframe, adtsHeader...) - s.aaframe = append(s.aaframe, msg.Payload[2:]...) - -} - -func (s *Session) cacheAACSeqHeader(msg rtmp.AVMsg) { - nazalog.Debug("cacheAACSeqHeader") - s.adts.PutAACSequenceHeader(msg.Payload) -} - -func (s *Session) cacheSPSPPS(msg rtmp.AVMsg) { - nazalog.Debugf("cacheSPSPPS. %s", hex.Dump(msg.Payload)) - s.spspps = msg.Payload -} - -func (s *Session) appendSPSPPS(out []byte) []byte { - index := 10 - nnals := s.spspps[index] & 0x1f - index++ - nazalog.Debugf("SPS number: %d", nnals) - for n := 0; ; n++ { - for ; nnals != 0; nnals-- { - len := int(bele.BEUint16(s.spspps[index:])) - nazalog.Debugf("header NAL length:%d", len) - index += 2 - out = append(out, nalStartCode...) - out = append(out, s.spspps[index:index+len]...) - index += len - } - - if n == 1 { - break - } - nnals = s.spspps[index] - nazalog.Debugf("PPS number: %d", nnals) - index++ - } - return out -} - -func (s *Session) updateFragment(ts uint64, boundary bool, flushRate int) { - force := false - discont := true - var f *Frag - - if s.opened { - f = s.getFrag(s.nfrags) - - // 当前时间戳跳跃很大,或者是往回跳跃超过了阈值,强制开启新的fragment - if (ts > s.fragTS && ts-s.fragTS > maxfraglen) || (s.fragTS > ts && s.fragTS-ts > negMaxfraglen) { - nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", s.fragTS, ts) - force = true - } else { - // TODO chef: 考虑ts比fragTS小的情况 - f.duration = float64(ts-s.fragTS) / 90000 - discont = false - } - } - - // 时长超过设置的ts文件切片阈值才行 - if f != nil && f.duration < fraglen/float64(1000) { - boundary = false - } - - // 开启新的fragment - if boundary || force { - s.closeFragment() - s.openFragment(ts, discont) - } - - // 音频已经缓存了一定时长的数据了,需要落盘了 - //nazalog.Debugf("CHEFERASEME 05191839, flush_rate=%d, size=%d, aframe_pts=%d, ts=%d", - // flushRate, len(s.aaframe), s.aframePTS, ts) - if s.opened && s.aaframe != nil && ((s.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) { - //nazalog.Debugf("CHEFERASEME 05191839.") - s.flushAudio() - } -} - -func (s *Session) openFragment(ts uint64, discont bool) { - if s.opened { - return - } - - s.ensureDir() - id := s.getFragmentID() - - filename := fmt.Sprintf("%s%s-%d.ts", outPath, s.streamName, id) - s.fp = mpegtsOpenFile(filename) - s.opened = true - - frag := s.getFrag(s.nfrags) - frag.active = true - frag.discont = discont - frag.id = uint64(id) - - s.fragTS = ts - - s.flushAudio() -} - -func (s *Session) closeFragment() { - if !s.opened { - return - } - - mpegtsCloseFile(s.fp) - - s.opened = false - - s.nextFrag() - - s.writePlaylist() - -} - -func (s *Session) writePlaylist() { - fp, err := os.Create(s.playlistFilenameBak) - nazalog.Assert(nil, err) - - // 找出时长最长的fragment - maxFrag := float64(fraglen / 1000) - for i := 0; i < s.nfrags; i++ { - frag := s.getFrag(i) - if frag.duration > maxFrag { - maxFrag = frag.duration + 0.5 - } - } - - // TODO chef 优化这块buffer的构造 - var buf bytes.Buffer - buf.WriteString("#EXTM3U\n") - buf.WriteString("#EXT-X-VERSION:3\n") - buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n", s.frag)) - buf.WriteString(fmt.Sprintf("#EXT-X-TARGETRATION:%d\n", int(maxFrag))) - - for i := 0; i < s.nfrags; i++ { - frag := s.getFrag(i) - - if frag.discont { - buf.WriteString("#EXT-X-DISCONTINUITY\n") - } - - buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s-%d.ts\n", frag.duration, s.streamName, frag.id)) - } - - _, err = fp.Write(buf.Bytes()) - nazalog.Assert(nil, err) - _ = fp.Close() - err = os.Rename(s.playlistFilenameBak, s.playlistFilename) - nazalog.Assert(nil, err) -} - -func (s *Session) ensureDir() { - err := os.MkdirAll(outPath, 0777) - nazalog.Assert(nil, err) -} - -func (s *Session) getFragmentID() int { - return s.frag + s.nfrags -} - -func (s *Session) getFrag(n int) *Frag { - return &s.frags[(s.frag+n)%(winfrags*2+1)] -} - -// TODO chef: 这个函数重命名为incr更好些 -func (s *Session) nextFrag() { - if s.nfrags == winfrags { - s.frag++ - } else { - s.nfrags++ - } -} - -// 将音频数据落盘的几种情况: -// 1. open fragment时,如果aframe中还有数据 -// 2. update fragment时,判断音频的时间戳 -// 3. 音频队列长度过长时 -// 4. 流关闭时 -func (s *Session) flushAudio() { - if !s.opened { - nazalog.Warn("flushAudio by not opened.") - return - } - - if s.aaframe == nil { - nazalog.Warn("flushAudio by aframe is nil.") - return - } - - frame := &MPEGTSFrame{ - pts: s.aframePTS, - dts: s.aframePTS, - pid: PidAudio, - sid: streamIDAudio, - cc: s.audioCC, - key: false, - } - - mpegtsWriteFrame(s.fp, frame, s.aaframe) - - s.audioCC = frame.cc - s.aaframe = nil -} diff --git a/pkg/hls/ts.go b/pkg/hls/ts.go deleted file mode 100644 index 4cd11aef..00000000 --- a/pkg/hls/ts.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2020, Chef. All rights reserved. -// https://github.com/q191201771/lal -// -// Use of this source code is governed by a MIT-style license -// that can be found in the License file. -// -// Author: Chef (191201771@qq.com) - -package hls - -import "github.com/q191201771/naza/pkg/nazalog" - -func SplitTS(content []byte) (ret [][]byte) { - for { - if len(content) < 188 { - nazalog.Assert(0, len(content)) - break - } - - ret = append(ret, content[0:188]) - content = content[188:] - } - return -} diff --git a/pkg/hls/header.go b/pkg/hls/ts_header.go similarity index 100% rename from pkg/hls/header.go rename to pkg/hls/ts_header.go diff --git a/pkg/hls/pat.go b/pkg/hls/ts_pat.go similarity index 100% rename from pkg/hls/pat.go rename to pkg/hls/ts_pat.go diff --git a/pkg/hls/pes.go b/pkg/hls/ts_pes.go similarity index 100% rename from pkg/hls/pes.go rename to pkg/hls/ts_pes.go diff --git a/pkg/hls/pmt.go b/pkg/hls/ts_pmt.go similarity index 100% rename from pkg/hls/pmt.go rename to pkg/hls/ts_pmt.go diff --git a/pkg/logic/config.go b/pkg/logic/config.go index ca3b6792..8239355d 100644 --- a/pkg/logic/config.go +++ b/pkg/logic/config.go @@ -8,9 +8,12 @@ package logic +import "github.com/q191201771/lal/pkg/hls" + type Config struct { RTMP RTMP `json:"rtmp"` HTTPFLV HTTPFLV `json:"httpflv"` + HLS HLS `json:"hls"` } type RTMP struct { @@ -22,3 +25,8 @@ type HTTPFLV struct { SubListenAddr string `json:"sub_listen_addr"` GOPNum int `json:"gop_num"` } + +type HLS struct { + SubListenAddr string `json:"sub_listen_addr"` + *hls.MuxerConfig +} diff --git a/pkg/logic/example_test.go b/pkg/logic/example_test.go index 8b78a4d1..1b0f8a1f 100644 --- a/pkg/logic/example_test.go +++ b/pkg/logic/example_test.go @@ -17,6 +17,8 @@ import ( "testing" "time" + "github.com/q191201771/lal/pkg/hls" + "github.com/q191201771/lal/pkg/logic" "github.com/q191201771/lal/pkg/httpflv" @@ -34,6 +36,7 @@ var ( rtmpAddr = ":19350" httpflvAddr = ":8080" + hlsAddr = ":10001" rFLVFileName = "testdata/test.flv" wFLVPullFileName = "testdata/flvpull.flv" @@ -67,6 +70,14 @@ func TestExample(t *testing.T) { config := logic.Config{ RTMP: logic.RTMP{Addr: rtmpAddr}, HTTPFLV: logic.HTTPFLV{SubListenAddr: httpflvAddr}, + HLS: logic.HLS{ + SubListenAddr: hlsAddr, + MuxerConfig: &hls.MuxerConfig{ + OutPath: "/tmp/lal/hls/", + FragmentDurationMS: 3000, + FragmentNum: 6, + }, + }, } pushURL = fmt.Sprintf("rtmp://127.0.0.1%s/live/11111", config.RTMP.Addr) diff --git a/pkg/logic/group.go b/pkg/logic/group.go index 356ab25b..3a897691 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -25,13 +25,15 @@ type Group struct { appName string streamName string + hlsConfig *hls.MuxerConfig + exitChan chan struct{} mutex sync.Mutex pubSession *rtmp.ServerSession rtmpSubSessionSet map[*rtmp.ServerSession]struct{} httpflvSubSessionSet map[*httpflv.SubSession]struct{} - hlsSession *hls.Session + hlsMuxer *hls.Muxer gopCache *GOPCache // TODO chef: 如果没有开启httpflv监听,可以不做格式转换,节约CPU资源 httpflvGopCache *GOPCache @@ -39,13 +41,14 @@ type Group struct { var _ rtmp.PubSessionObserver = &Group{} -func NewGroup(appName string, streamName string, rtmpGOPNum int, httpflvGOPNum int) *Group { +func NewGroup(appName string, streamName string, rtmpGOPNum int, httpflvGOPNum int, hlsConfig *hls.MuxerConfig) *Group { uk := unique.GenUniqueKey("GROUP") log.Infof("lifecycle new group. [%s] appName=%s, streamName=%s", uk, appName, streamName) return &Group{ UniqueKey: uk, appName: appName, streamName: streamName, + hlsConfig: hlsConfig, exitChan: make(chan struct{}, 1), rtmpSubSessionSet: make(map[*rtmp.ServerSession]struct{}), httpflvSubSessionSet: make(map[*httpflv.SubSession]struct{}), @@ -84,8 +87,8 @@ func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool { } group.pubSession = session - group.hlsSession = hls.NewSession(group.streamName) - group.hlsSession.Start() + group.hlsMuxer = hls.NewMuxer(group.streamName, group.hlsConfig) + group.hlsMuxer.Start() group.mutex.Unlock() session.SetPubSessionObserver(group) @@ -97,7 +100,7 @@ func (group *Group) DelRTMPPubSession(session *rtmp.ServerSession) { group.mutex.Lock() defer group.mutex.Unlock() group.pubSession = nil - group.hlsSession.Stop() + group.hlsMuxer.Stop() group.gopCache.Clear() group.httpflvGopCache.Clear() @@ -156,7 +159,7 @@ func (group *Group) OnReadRTMPAVMsg(msg rtmp.AVMsg) { //log.Debugf("%+v, %02x, %02x", msg.Header, msg.Payload[0], msg.Payload[1]) group.broadcastRTMP(msg) - group.hlsSession.FeedRTMPMessage(msg) + group.hlsMuxer.FeedRTMPMessage(msg) } func (group *Group) broadcastRTMP(msg rtmp.AVMsg) { diff --git a/pkg/logic/server_manager.go b/pkg/logic/server_manager.go index 158b76b2..ed87f4d1 100644 --- a/pkg/logic/server_manager.go +++ b/pkg/logic/server_manager.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/q191201771/lal/pkg/hls" + "github.com/q191201771/lal/pkg/httpflv" "github.com/q191201771/lal/pkg/rtmp" log "github.com/q191201771/naza/pkg/nazalog" @@ -22,6 +24,7 @@ type ServerManager struct { httpflvServer *httpflv.Server rtmpServer *rtmp.Server + hlsServer *hls.Server exitChan chan struct{} mutex sync.Mutex @@ -40,6 +43,9 @@ func NewServerManager(config *Config) *ServerManager { if len(config.RTMP.Addr) != 0 { m.rtmpServer = rtmp.NewServer(m, config.RTMP.Addr) } + if len(config.HLS.SubListenAddr) != 0 { + m.hlsServer = hls.NewServer(config.HLS.SubListenAddr, config.HLS.OutPath) + } return m } @@ -60,6 +66,14 @@ func (sm *ServerManager) RunLoop() { }() } + if sm.hlsServer != nil { + go func() { + if err := sm.hlsServer.RunLoop(); err != nil { + log.Error(err) + } + }() + } + t := time.NewTicker(1 * time.Second) defer t.Stop() var count uint32 @@ -168,7 +182,7 @@ func (sm *ServerManager) check() { func (sm *ServerManager) getOrCreateGroup(appName string, streamName string) *Group { group, exist := sm.groupMap[streamName] if !exist { - group = NewGroup(appName, streamName, sm.config.RTMP.GOPNum, sm.config.HTTPFLV.GOPNum) + group = NewGroup(appName, streamName, sm.config.RTMP.GOPNum, sm.config.HTTPFLV.GOPNum, sm.config.HLS.MuxerConfig) sm.groupMap[streamName] = group } go group.RunLoop() diff --git a/pkg/rtmp/amf0.go b/pkg/rtmp/amf0.go index e98a4fb5..f298d60e 100644 --- a/pkg/rtmp/amf0.go +++ b/pkg/rtmp/amf0.go @@ -214,6 +214,10 @@ func (amf0) ReadNull(b []byte) (int, error) { return 1, nil } +// TODO chef: +// 考虑将map改成数组 +// - Go的map是顺序随机的,使用map也即丢失了原始数据的顺序性 +// - 如果Object中存在key相同的情况(先不讨论是否合法,如果业务方非要这么用),可能会覆盖导致丢失 func (amf0) ReadObject(b []byte) (map[string]interface{}, int, error) { if len(b) < 1 { return nil, 0, ErrAMFTooShort diff --git a/pkg/rtmp/handshake.go b/pkg/rtmp/handshake.go index 4b681ae5..86e1fcd4 100644 --- a/pkg/rtmp/handshake.go +++ b/pkg/rtmp/handshake.go @@ -19,6 +19,8 @@ import ( log "github.com/q191201771/naza/pkg/nazalog" ) +// https://pengrl.com/p/20027 + const version = uint8(3) const ( diff --git a/pkg/rtmp/stream.go b/pkg/rtmp/stream.go index b7ae5cc4..4c6a5f51 100644 --- a/pkg/rtmp/stream.go +++ b/pkg/rtmp/stream.go @@ -28,6 +28,7 @@ type Header struct { TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳 } +// TODO chef: 将这个buffer实现和bytes.Buffer做比较,考虑将它放入naza package中 type StreamMsg struct { buf []byte b uint32