diff --git a/.gitignore b/.gitignore index 6ee1b2ad..a392ef43 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ profile.out coverage.html +*.aac +*.h264 /pre-commit.sh /coverage.txt diff --git a/app/tscmp/tscmp.go b/app/tscmp/tscmp.go index 2ca6e32c..1f967e22 100644 --- a/app/tscmp/tscmp.go +++ b/app/tscmp/tscmp.go @@ -19,13 +19,13 @@ import ( // 比较两个TS文件,注意,该程序还没有写完 -var filename1 = "/Volumes/Data/lal-0.ts" -var filename2 = "/Volumes/Data/nrm-0.ts" +var filename1 = "/Volumes/Data/tmp/lal-4.ts" +var filename2 = "/Volumes/Data/tmp/nrm-4.ts" -func skipAudioPacketFilter(tss [][]byte) (ret [][]byte) { +func skipPacketFilter(tss [][]byte) (ret [][]byte) { for _, ts := range tss { h := hls.ParseTSPacketHeader(ts) - if h.Pid == uint16(257) { + if h.Pid == hls.PidAudio { continue } ret = append(ret, ts) @@ -36,6 +36,24 @@ func skipAudioPacketFilter(tss [][]byte) (ret [][]byte) { func parsePacket(packet []byte) { h := hls.ParseTSPacketHeader(packet) nazalog.Debugf("%+v", h) + index := 4 + + var adaptation hls.TSPacketAdaptation + switch h.Adaptation { + case hls.AdaptationFieldControlNo: + // noop + case hls.AdaptationFieldControlFollowed: + adaptation = hls.ParseTSPacketAdaptation(packet[4:]) + index++ + default: + nazalog.Warn(h.Adaptation) + } + index += int(adaptation.Length) + + if h.PayloadUnitStart == 1 && h.Pid == 256 { + pes, length := hls.ParsePES(packet[index:]) + nazalog.Debugf("%+v, %d", pes, length) + } } func main() { @@ -50,10 +68,10 @@ func main() { nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2)) - tss1 = skipAudioPacketFilter(tss1) - tss2 = skipAudioPacketFilter(tss2) + //tss1 = skipPacketFilter(tss1) + //tss2 = skipPacketFilter(tss2) - nazalog.Debugf("after skip audio. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2)) + nazalog.Debugf("after skip. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2)) m := len(tss1) if m > len(tss2) { diff --git a/go.mod b/go.mod index ef9ef5ed..e8f9204a 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/q191201771/lal go 1.12 -require github.com/q191201771/naza v0.12.1 +require github.com/q191201771/naza v0.12.2 diff --git a/go.sum b/go.sum index f4463ed8..044e43a8 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,2 @@ -github.com/q191201771/naza v0.12.1 h1:uBJ9mrucssU4i10DrPaA07pnomOk2X7PnBZvP7Z1yXw= -github.com/q191201771/naza v0.12.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= +github.com/q191201771/naza v0.12.2 h1:El5OSCPFrGGrZiyZ0aOvdystC15Ap7lC4MipVKdfVMY= +github.com/q191201771/naza v0.12.2/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM= diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index 315b8c65..0bbe5e9c 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -56,12 +56,14 @@ func (obj *ADTS) PutAACSequenceHeader(payload []byte) { // <1.6.3.4 channelConfiguration> // -------------------------------------------------------- // audio object type [5b] 2=AAC LC - // samplingFrequencyIndex [4b] 3=48000 + // samplingFrequencyIndex [4b] 3=48000 4=44100 // channelConfiguration [4b] 2=left, right front speakers obj.audioObjectType = br.ReadBits8(5) obj.samplingFrequencyIndex = br.ReadBits8(4) obj.channelConfiguration = br.ReadBits8(4) log.Debugf("%+v", obj) + + obj.adtsHeader = make([]byte, 7) } // 获取 ADTS 头,注意,由于ADTS头依赖包的长度,而每个包的长度不同,所以生成的每个包的 ADTS 头也不同 @@ -90,9 +92,6 @@ func (obj *ADTS) GetADTS(length uint16) []byte { // adts_buffer_fullness [11b] // no_raw_data_blocks_in_frame [2b] - if obj.adtsHeader == nil { - obj.adtsHeader = make([]byte, 7) - } // 减去头两字节,再加上自身adts头的7个字节 length += 5 @@ -111,6 +110,10 @@ func (obj *ADTS) GetADTS(length uint16) []byte { return obj.adtsHeader } +func (obj *ADTS) IsNil() bool { + return obj.adtsHeader == nil +} + // 将 rtmp AAC 传入,输出带 ADTS 头的 AAC ES流 // @param rtmp message payload部分 func CaptureAAC(w io.Writer, payload []byte) { diff --git a/pkg/hls/header.go b/pkg/hls/header.go index 16bfef4b..55003f3f 100644 --- a/pkg/hls/header.go +++ b/pkg/hls/header.go @@ -13,6 +13,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) +// ------------------------------------------------ // <2.4.3.2> // sync_byte [8b] * always 0x47 // transport_error_indicator [1b] @@ -22,6 +23,7 @@ import ( // transport_scrambling_control [2b] // adaptation_field_control [2b] // continuity_counter [4b] * +// ------------------------------------------------ type TSPacketHeader struct { Sync uint8 Err uint8 @@ -33,6 +35,7 @@ type TSPacketHeader struct { CC uint8 } +// ---------------------------------------------------------- // // adaptation_field_length [8b] * 不包括自己这1字节 // discontinuity_indicator [1b] @@ -47,6 +50,7 @@ type TSPacketHeader struct { // program_clock_reference_base [33b] // reserved [6b] // program_clock_reference_extension [9b] ****** +// ---------------------------------------------------------- type TSPacketAdaptation struct { Length uint8 } diff --git a/pkg/hls/hls.go b/pkg/hls/hls.go index 6a77cedc..a789ecea 100644 --- a/pkg/hls/hls.go +++ b/pkg/hls/hls.go @@ -13,8 +13,14 @@ package hls // - 不提供各种配置项 // - 只支持H264和AAC // - 先参照nginx rtmp module把功能实现,再做重构 +// +// - 检查所有的容错处理,是否会出现 +// - 配置项 +// - web服务 +// - 清理文件 // https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist +// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction // #EXTM3U // 固定串 // #EXT-X-VERSION:3 // 固定串 // #EXT-X-MEDIA-SEQUENCE // m3u8文件中,第一个TS文件的序号 @@ -25,7 +31,8 @@ package hls // 重构时,需要统一项目中数据的命名,比如,进来的数据称为Frame帧,188字节的封装称为TSPacket包,TS文件称为Fragment -var FixedTSHeader = []byte{ +// 每个TS文件都以固定的PAT,PMT开始 +var FixedFragmentHeader = []byte{ /* TS */ 0x47, 0x40, 0x00, 0x10, 0x00, /* PSI */ @@ -61,8 +68,8 @@ var FixedTSHeader = []byte{ /* PMT */ 0xe1, 0x00, 0xf0, 0x00, - 0x1b, 0xe1, 0x00, 0xf0, 0x00, /* h264 epid 256 */ - 0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */ + 0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */ + 0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */ /* CRC */ 0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */ /* stuffing 157 bytes */ @@ -104,7 +111,9 @@ const ( PidPAT uint16 = 0 + // ------------------------------------------ //
+ // ------------------------------------------ AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC AdaptationFieldControlNo uint8 = 1 // No adaptation_field, payload only AdaptationFieldControlOnly uint8 = 2 // Adaptation_field only, no payload @@ -113,20 +122,26 @@ const ( // PMT const ( + // ----------------------------------------------------------------------------- //
// 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax // 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video + // ----------------------------------------------------------------------------- streamTypeAAC uint8 = 0x0F streamTypeAVC uint8 = 0x1B ) // PES const ( + // ----------------------------------------------------------------- //
- streamIDAudio uint8 = 192 // 110x xxxx + // ----------------------------------------------------------------- + streamIDAudio uint8 = 192 // 110x xxxx 0xC0 streamIDVideo uint8 = 224 // 1110 xxxx + // ------------------------------ // + // ------------------------------ PTSDTSFlags0 uint8 = 0 // no PTS no DTS PTSDTSFlags1 uint8 = 1 // forbidden PTSDTSFlags2 uint8 = 2 // only PTS @@ -134,17 +149,18 @@ const ( ) const ( - pidVideo uint16 = 0x100 - delay uint64 = 63000 // 700 ms PCR delay + PidVideo uint16 = 0x100 + PidAudio uint16 = 0x101 + delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用? // TODO chef 这些在配置项中提供 - outPath = "/tmp/lal/hls/" // 切片文件输出目录 - fraglen = 5000 // 单个TS时长,单位毫秒 - maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长,强制切割新的fragment,单位毫秒 * 90 - negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位毫秒 * 90 - playlen = 30000 // m3u8列表时长 - winfrags = playlen / fraglen // 多少个TS文件 - maxAudioDelay = 300 - audioBufSize = 1024 * 1024 - Sync = 2 + outPath = "/tmp/lal/hls/" // 切片文件输出目录 + fraglen = 5000 // 单个TS时长,单位毫秒 + playlen = 30000 // m3u8列表时长 + maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长,强制切割新的fragment,单位毫秒 * 90 + negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位毫秒 * 90 + winfrags = playlen / fraglen // 多少个TS文件 + maxAudioDelay uint64 = 300 // 单位毫秒 + audioBufSize = 1024 * 1024 + Sync = 2 ) diff --git a/pkg/hls/hls_test.go b/pkg/hls/hls_test.go index 33b6f966..692531f5 100644 --- a/pkg/hls/hls_test.go +++ b/pkg/hls/hls_test.go @@ -16,13 +16,13 @@ import ( ) func TestParseFixedTSPacket(t *testing.T) { - h := hls.ParseTSPacketHeader(hls.FixedTSHeader) + h := hls.ParseTSPacketHeader(hls.FixedFragmentHeader) nazalog.Debugf("%+v", h) - pat := hls.ParsePAT(hls.FixedTSHeader[5:]) + pat := hls.ParsePAT(hls.FixedFragmentHeader[5:]) nazalog.Debugf("%+v", pat) - h = hls.ParseTSPacketHeader(hls.FixedTSHeader[188:]) + h = hls.ParseTSPacketHeader(hls.FixedFragmentHeader[188:]) nazalog.Debugf("%+v", h) - pmt := hls.ParsePMT(hls.FixedTSHeader[188+5:]) + pmt := hls.ParsePMT(hls.FixedFragmentHeader[188+5:]) nazalog.Debugf("%+v", pmt) } diff --git a/pkg/hls/mpegts.go b/pkg/hls/mpegts.go index f95d7b71..bf816131 100644 --- a/pkg/hls/mpegts.go +++ b/pkg/hls/mpegts.go @@ -28,12 +28,12 @@ type MPEGTSFrame struct { func mpegtsOpenFile(filename string) *os.File { fp, err := os.Create(filename) nazalog.Assert(nil, err) - mpegtsWriteFile(fp, FixedTSHeader) + mpegtsWriteFile(fp, FixedFragmentHeader) return fp } func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { - nazalog.Debugf("mpegts: write frame. %+v, size=%d", frame, len(b)) + //nazalog.Debugf("mpegts: pid=%d, sid=%d, pts=%d, dts=%d, key=%b, size=%d", frame.pid, frame.sid, frame.pts, frame.dts, frame.key, len(b)) wpos := 0 // 当前packet的写入位置 lpos := 0 // 当前帧的处理位置 @@ -111,7 +111,6 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { // PES_CRC_flag 0 // PES_extension_flag 0 // PES_header_data_length - nazalog.Debug("write PES.") packet[wpos] = 0x00 // packet_start_code_prefix packet[wpos+1] = 0x00 // packet[wpos+2] = 0x01 // @@ -148,6 +147,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) { if frame.pts != frame.dts { mpegtsWritePTS(packet[wpos:], 1, frame.dts+delay) wpos += 5 + //nazalog.Debugf("%d %d", (frame.pts)/90, (frame.dts)/90) } first = false @@ -239,16 +239,8 @@ func mpegtsWritePTS(out []byte, fb uint8, pts uint64) { out[4] = uint8(val) } -//var debugCount int - func mpegtsWriteFile(fp *os.File, b []byte) { - //nazalog.Debugf("(%d) mpegts: write %d bytes.", debugCount, len(b)) - //debugCount++ - //if debugCount == 60 { - // nazalog.Debugf("%s", hex.Dump(b)) - //} _, _ = fp.Write(b) - _ = fp.Sync() } func mpegtsCloseFile(fp *os.File) { diff --git a/pkg/hls/pat.go b/pkg/hls/pat.go index 0ecd758f..ba515424 100644 --- a/pkg/hls/pat.go +++ b/pkg/hls/pat.go @@ -12,6 +12,7 @@ import ( "github.com/q191201771/naza/pkg/nazabits" ) +// --------------------------------------------------------------------------------------------------- // Program association section // <2.4.4.3> // table_id [8b] * @@ -31,6 +32,7 @@ import ( // program_map_PID [13b] ** if program_number == 0 then network_PID else then program_map_PID // -------------- // CRC_32 [32b] **** +// --------------------------------------------------------------------------------------------------- type PAT struct { tid uint8 ssi uint8 @@ -62,9 +64,9 @@ func ParsePAT(b []byte) (pat PAT) { pat.sn = br.ReadBits8(8) pat.lsn = br.ReadBits8(8) - len := pat.sl - 9 + length := pat.sl - 9 - for i := uint16(0); i < len; i += 4 { + for i := uint16(0); i < length; i += 4 { var ppe PATProgramElement ppe.pn = br.ReadBits16(16) br.ReadBits8(3) diff --git a/pkg/hls/pes.go b/pkg/hls/pes.go index 7038ab5a..6d45b21b 100644 --- a/pkg/hls/pes.go +++ b/pkg/hls/pes.go @@ -12,6 +12,7 @@ import ( "github.com/q191201771/naza/pkg/nazabits" ) +// ----------------------------------------------------------- // // <2.4.3.6 PES packet> //
@@ -33,14 +34,17 @@ import ( // PES_CRC_flag [1b] // PES_extension_flag [1b] * // PES_header_data_length [8b] * +// ----------------------------------------------------------- type PES struct { - pscp uint32 - sid uint8 - ppl uint16 - pad1 uint8 - pdf uint8 - pad2 uint8 - phdl uint8 + pscp uint32 + sid uint8 + ppl uint16 + pad1 uint8 + ptsDtsFlag uint8 + pad2 uint8 + phdl uint8 + pts uint64 + dts uint64 } func ParsePES(b []byte) (pes PES, length int) { @@ -50,12 +54,33 @@ func ParsePES(b []byte) (pes PES, length int) { pes.ppl = br.ReadBits16(16) pes.pad1 = br.ReadBits8(8) - pes.pdf = br.ReadBits8(2) + pes.ptsDtsFlag = br.ReadBits8(2) pes.pad2 = br.ReadBits8(6) pes.phdl = br.ReadBits8(8) br.ReadBytes(uint(pes.phdl)) length = 9 + int(pes.phdl) + // 处理得不是特别标准 + if pes.ptsDtsFlag&0x2 != 0 { + _, pes.pts = readPTS(b[9:]) + } + if pes.ptsDtsFlag&0x1 != 0 { + _, pes.dts = readPTS(b[14:]) + } else { + pes.dts = pes.pts + } + //pes.pts = (pes.pts - delay) / 90 + //pes.dts = (pes.dts - delay) / 90 + + return +} + +// read pts or dts +func readPTS(b []byte) (fb uint8, pts uint64) { + fb = b[0] >> 4 + pts |= uint64((b[0]>>1)&0x07) << 30 + pts |= (uint64(b[1])<<8 | uint64(b[2])) >> 1 << 15 + pts |= (uint64(b[3])<<8 | uint64(b[4])) >> 1 return } diff --git a/pkg/hls/pmt.go b/pkg/hls/pmt.go index cf992793..89dffd10 100644 --- a/pkg/hls/pmt.go +++ b/pkg/hls/pmt.go @@ -13,6 +13,7 @@ import ( "github.com/q191201771/naza/pkg/nazalog" ) +// ---------------------------------------- // Program Map Table // <2.4.4.8> // table_id [8b] * @@ -38,6 +39,7 @@ import ( // ES_info_length_length [12b] ** // -------------- // CRC32 [32b] **** +// ---------------------------------------- type PMT struct { tid uint8 ssi uint8 diff --git a/pkg/hls/session.go b/pkg/hls/session.go index 2da4365b..4db58f3a 100644 --- a/pkg/hls/session.go +++ b/pkg/hls/session.go @@ -9,6 +9,7 @@ package hls import ( + "bytes" "encoding/hex" "fmt" "os" @@ -25,37 +26,48 @@ type Frag struct { keyID uint64 duration float64 // 当前fragment中数据的时长,单位秒 active bool - discont bool + discont bool // #EXT-X-DISCONTINUITY } type Session struct { + streamName string + playlistFilename string + playlistFilenameBak string + adts aac.ADTS //aacSeqHeader []byte spspps []byte videoCC uint8 + audioCC uint8 opened bool videoOut []byte // 帧 fp *os.File fragTS uint64 // 新建立fragment时的时间戳,毫秒 * 90 - frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 + nfrags int // 大序号,增长到winfrags后,就增长frag + frag int // 写入m3u8的EXT-X-MEDIA-SEQUENCE字段 frags []Frag // TS文件的环形队列,记录TS的信息,比如写M3U8文件时要用 2 * winfrags + 1 - aframe []byte - aframeBase uint64 // 上一个音频帧的时间戳 + aaframe []byte + //aframeBase uint64 // 上一个音频帧的时间戳 //aframeNum uint64 - aframePTS uint64 + aframePTS uint64 // 最新音频帧的时间戳 } -func NewSession() *Session { +func NewSession(streamName string) *Session { + playlistFilename := fmt.Sprintf("%s%s.m3u8", outPath, streamName) + playlistFilenameBak := fmt.Sprintf("%s.bak", playlistFilename) videoOut := make([]byte, 1024*1024) - aframe := make([]byte, 1024*1024) + videoOut = videoOut[0:0] frags := make([]Frag, 2*winfrags+1) // TODO chef: 为什么是 * 2 + 1 return &Session{ - videoOut: videoOut, - aframe: aframe, - frags: frags, + videoOut: videoOut, + aaframe: nil, + frags: frags, + streamName: streamName, + playlistFilename: playlistFilename, + playlistFilenameBak: playlistFilenameBak, } } @@ -63,7 +75,14 @@ func (s *Session) Start() { } +func (s *Session) Stop() { + s.flushAudio() + s.closeFragment() +} + func (s *Session) FeedRTMPMessage(msg rtmp.AVMsg) { + // TODO chef: to be continued + // HLS还没有开发完 return switch msg.Header.MsgTypeID { case rtmp.TypeidAudio: @@ -73,18 +92,7 @@ func (s *Session) FeedRTMPMessage(msg rtmp.AVMsg) { } } -func (s *Session) Stop() { - -} - -//var debugCount int - func (s *Session) feedVideo(msg rtmp.AVMsg) { - //if debugCount == 3 { - // //os.Exit(0) - //} - //debugCount++ - if msg.Payload[0]&0xF != 7 { // TODO chef: HLS视频现在只做了h264的支持 return @@ -110,7 +118,7 @@ func (s *Session) feedVideo(msg rtmp.AVMsg) { srcNalType := msg.Payload[i] nalType := srcNalType & 0x1F - nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts) + //nazalog.Debugf("hls: h264 NAL type=%d, len=%d(%d) cts=%d.", nalType, nalBytes, len(msg.Payload), cts) if nalType >= 7 && nalType <= 9 { nazalog.Warn("should not reach here.") @@ -153,14 +161,19 @@ func (s *Session) feedVideo(msg rtmp.AVMsg) { frame.cc = s.videoCC frame.dts = uint64(msg.Header.TimestampAbs) * 90 frame.pts = frame.dts + uint64(cts)*90 - frame.pid = pidVideo + frame.pid = PidVideo frame.sid = streamIDVideo frame.key = ftype == 1 - //boundary := frame.key && (true || !s.opened) - boundary := frame.key + boundary := frame.key && (!s.opened || s.adts.IsNil() || s.aaframe != nil) s.updateFragment(frame.dts, boundary, 1) + + if !s.opened { + nazalog.Warn("not opened.") + return + } + mpegtsWriteFrame(s.fp, &frame, out) s.videoCC = frame.cc } @@ -180,11 +193,14 @@ func (s *Session) feedAudio(msg rtmp.AVMsg) { s.updateFragment(pts, s.spspps == nil, 2) + if s.aaframe == nil { + s.aframePTS = pts + } + adtsHeader := s.adts.GetADTS(uint16(msg.Header.MsgLen)) - s.aframe = append(s.aframe, adtsHeader...) - s.aframe = append(s.aframe, msg.Payload...) + s.aaframe = append(s.aaframe, adtsHeader...) + s.aaframe = append(s.aaframe, msg.Payload[2:]...) - s.aframePTS = pts } func (s *Session) cacheAACSeqHeader(msg rtmp.AVMsg) { @@ -230,6 +246,7 @@ func (s *Session) updateFragment(ts uint64, boundary bool, flushRate int) { if s.opened { f = s.getFrag(s.nfrags) + // 当前时间戳跳跃很大,或者是往回跳跃超过了阈值,强制开启新的fragment if (ts > s.fragTS && ts-s.fragTS > maxfraglen) || (s.fragTS > ts && s.fragTS-ts > negMaxfraglen) { nazalog.Warnf("hls: force fragment split: fragTS=%d, ts=%d", s.fragTS, ts) force = true @@ -240,13 +257,24 @@ func (s *Session) updateFragment(ts uint64, boundary bool, flushRate int) { } } + // 时长超过设置的ts文件切片阈值才行 if f != nil && f.duration < fraglen/float64(1000) { boundary = false } + // 开启新的fragment if boundary || force { + s.closeFragment() s.openFragment(ts, discont) } + + // 音频已经缓存了一定时长的数据了,需要落盘了 + //nazalog.Debugf("CHEFERASEME 05191839, flush_rate=%d, size=%d, aframe_pts=%d, ts=%d", + // flushRate, len(s.aaframe), s.aframePTS, ts) + if s.opened && s.aaframe != nil && ((s.aframePTS + maxAudioDelay*90/uint64(flushRate)) < ts) { + //nazalog.Debugf("CHEFERASEME 05191839.") + s.flushAudio() + } } func (s *Session) openFragment(ts uint64, discont bool) { @@ -257,29 +285,70 @@ func (s *Session) openFragment(ts uint64, discont bool) { s.ensureDir() id := s.getFragmentID() - filename := fmt.Sprintf("%s%d.ts", outPath, id) + filename := fmt.Sprintf("%s%s-%d.ts", outPath, s.streamName, id) s.fp = mpegtsOpenFile(filename) s.opened = true + + frag := s.getFrag(s.nfrags) + frag.active = true + frag.discont = discont + frag.id = uint64(id) + s.fragTS = ts + + s.flushAudio() } func (s *Session) closeFragment() { if !s.opened { - // TODO chef: 关注下是否有这种情况 - nazalog.Assert(true, s.opened) + return } mpegtsCloseFile(s.fp) + s.opened = false + s.nextFrag() s.writePlaylist() - s.opened = false } func (s *Session) writePlaylist() { - // to be continued + fp, err := os.Create(s.playlistFilenameBak) + nazalog.Assert(nil, err) + + // 找出时长最长的fragment + maxFrag := float64(fraglen / 1000) + for i := 0; i < s.nfrags; i++ { + frag := s.getFrag(i) + if frag.duration > maxFrag { + maxFrag = frag.duration + 0.5 + } + } + + // TODO chef 优化这块buffer的构造 + var buf bytes.Buffer + buf.WriteString("#EXTM3U\n") + buf.WriteString("#EXT-X-VERSION:3\n") + buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n", s.frag)) + buf.WriteString(fmt.Sprintf("#EXT-X-TARGETRATION:%d\n", int(maxFrag))) + + for i := 0; i < s.nfrags; i++ { + frag := s.getFrag(i) + + if frag.discont { + buf.WriteString("#EXT-X-DISCONTINUITY\n") + } + + buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s-%d.ts\n", frag.duration, s.streamName, frag.id)) + } + + _, err = fp.Write(buf.Bytes()) + nazalog.Assert(nil, err) + _ = fp.Close() + err = os.Rename(s.playlistFilenameBak, s.playlistFilename) + nazalog.Assert(nil, err) } func (s *Session) ensureDir() { @@ -288,7 +357,7 @@ func (s *Session) ensureDir() { } func (s *Session) getFragmentID() int { - return s.frag + return s.frag + s.nfrags } func (s *Session) getFrag(n int) *Frag { @@ -304,6 +373,33 @@ func (s *Session) nextFrag() { } } -// -// ngx_rtmp_hls_next_frag() 如果nfrags达到了winfrags,则递增frag,否则递增nfrags -// 关闭fragment时调用 +// 将音频数据落盘的几种情况: +// 1. open fragment时,如果aframe中还有数据 +// 2. update fragment时,判断音频的时间戳 +// 3. 音频队列长度过长时 +// 4. 流关闭时 +func (s *Session) flushAudio() { + if !s.opened { + nazalog.Warn("flushAudio by not opened.") + return + } + + if s.aaframe == nil { + nazalog.Warn("flushAudio by aframe is nil.") + return + } + + frame := &MPEGTSFrame{ + pts: s.aframePTS, + dts: s.aframePTS, + pid: PidAudio, + sid: streamIDAudio, + cc: s.audioCC, + key: false, + } + + mpegtsWriteFrame(s.fp, frame, s.aaframe) + + s.audioCC = frame.cc + s.aaframe = nil +} diff --git a/pkg/logic/group.go b/pkg/logic/group.go index d4b74ee3..356ab25b 100644 --- a/pkg/logic/group.go +++ b/pkg/logic/group.go @@ -84,7 +84,7 @@ func (group *Group) AddRTMPPubSession(session *rtmp.ServerSession) bool { } group.pubSession = session - group.hlsSession = hls.NewSession() + group.hlsSession = hls.NewSession(group.streamName) group.hlsSession.Start() group.mutex.Unlock()