Skip to content

Commit

Permalink
[feat] HLS: rtmp音视频数据转换成m3u8+ts格式并落盘部分基本完成了
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed May 23, 2020
1 parent 99ab8df commit d7e7729
Show file tree
Hide file tree
Showing 14 changed files with 252 additions and 92 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
profile.out
coverage.html
*.aac
*.h264

/pre-commit.sh
/coverage.txt
Expand Down
32 changes: 25 additions & 7 deletions app/tscmp/tscmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

// 比较两个TS文件,注意,该程序还没有写完

var filename1 = "/Volumes/Data/lal-0.ts"
var filename2 = "/Volumes/Data/nrm-0.ts"
var filename1 = "/Volumes/Data/tmp/lal-4.ts"
var filename2 = "/Volumes/Data/tmp/nrm-4.ts"

func skipAudioPacketFilter(tss [][]byte) (ret [][]byte) {
func skipPacketFilter(tss [][]byte) (ret [][]byte) {
for _, ts := range tss {
h := hls.ParseTSPacketHeader(ts)
if h.Pid == uint16(257) {
if h.Pid == hls.PidAudio {
continue
}
ret = append(ret, ts)
Expand All @@ -36,6 +36,24 @@ func skipAudioPacketFilter(tss [][]byte) (ret [][]byte) {
func parsePacket(packet []byte) {
h := hls.ParseTSPacketHeader(packet)
nazalog.Debugf("%+v", h)
index := 4

var adaptation hls.TSPacketAdaptation
switch h.Adaptation {
case hls.AdaptationFieldControlNo:
// noop
case hls.AdaptationFieldControlFollowed:
adaptation = hls.ParseTSPacketAdaptation(packet[4:])
index++
default:
nazalog.Warn(h.Adaptation)
}
index += int(adaptation.Length)

if h.PayloadUnitStart == 1 && h.Pid == 256 {
pes, length := hls.ParsePES(packet[index:])
nazalog.Debugf("%+v, %d", pes, length)
}
}

func main() {
Expand All @@ -50,10 +68,10 @@ func main() {

nazalog.Debugf("num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))

tss1 = skipAudioPacketFilter(tss1)
tss2 = skipAudioPacketFilter(tss2)
//tss1 = skipPacketFilter(tss1)
//tss2 = skipPacketFilter(tss2)

nazalog.Debugf("after skip audio. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))
nazalog.Debugf("after skip. num of ts1=%d, num of ts2=%d", len(tss1), len(tss2))

m := len(tss1)
if m > len(tss2) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/q191201771/lal

go 1.12

require github.com/q191201771/naza v0.12.1
require github.com/q191201771/naza v0.12.2
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
github.com/q191201771/naza v0.12.1 h1:uBJ9mrucssU4i10DrPaA07pnomOk2X7PnBZvP7Z1yXw=
github.com/q191201771/naza v0.12.1/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
github.com/q191201771/naza v0.12.2 h1:El5OSCPFrGGrZiyZ0aOvdystC15Ap7lC4MipVKdfVMY=
github.com/q191201771/naza v0.12.2/go.mod h1:SE14GBGO9mAn6JZl3NlfWGtNOT7xQjxOG7f3YOdBThM=
11 changes: 7 additions & 4 deletions pkg/aac/aac.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ func (obj *ADTS) PutAACSequenceHeader(payload []byte) {
// <1.6.3.4 channelConfiguration>
// --------------------------------------------------------
// audio object type [5b] 2=AAC LC
// samplingFrequencyIndex [4b] 3=48000
// samplingFrequencyIndex [4b] 3=48000 4=44100
// channelConfiguration [4b] 2=left, right front speakers
obj.audioObjectType = br.ReadBits8(5)
obj.samplingFrequencyIndex = br.ReadBits8(4)
obj.channelConfiguration = br.ReadBits8(4)
log.Debugf("%+v", obj)

obj.adtsHeader = make([]byte, 7)
}

// 获取 ADTS 头,注意,由于ADTS头依赖包的长度,而每个包的长度不同,所以生成的每个包的 ADTS 头也不同
Expand Down Expand Up @@ -90,9 +92,6 @@ func (obj *ADTS) GetADTS(length uint16) []byte {
// adts_buffer_fullness [11b]
// no_raw_data_blocks_in_frame [2b]

if obj.adtsHeader == nil {
obj.adtsHeader = make([]byte, 7)
}
// 减去头两字节,再加上自身adts头的7个字节
length += 5

Expand All @@ -111,6 +110,10 @@ func (obj *ADTS) GetADTS(length uint16) []byte {
return obj.adtsHeader
}

func (obj *ADTS) IsNil() bool {
return obj.adtsHeader == nil
}

// 将 rtmp AAC 传入,输出带 ADTS 头的 AAC ES流
// @param <payload> rtmp message payload部分
func CaptureAAC(w io.Writer, payload []byte) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/hls/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)

// ------------------------------------------------
// <iso13818-1.pdf> <2.4.3.2> <page 36/174>
// sync_byte [8b] * always 0x47
// transport_error_indicator [1b]
Expand All @@ -22,6 +23,7 @@ import (
// transport_scrambling_control [2b]
// adaptation_field_control [2b]
// continuity_counter [4b] *
// ------------------------------------------------
type TSPacketHeader struct {
Sync uint8
Err uint8
Expand All @@ -33,6 +35,7 @@ type TSPacketHeader struct {
CC uint8
}

// ----------------------------------------------------------
// <iso13818-1.pdf> <Table 2-6> <page 40/174>
// adaptation_field_length [8b] * 不包括自己这1字节
// discontinuity_indicator [1b]
Expand All @@ -47,6 +50,7 @@ type TSPacketHeader struct {
// program_clock_reference_base [33b]
// reserved [6b]
// program_clock_reference_extension [9b] ******
// ----------------------------------------------------------
type TSPacketAdaptation struct {
Length uint8
}
Expand Down
46 changes: 31 additions & 15 deletions pkg/hls/hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ package hls
// - 不提供各种配置项
// - 只支持H264和AAC
// - 先参照nginx rtmp module把功能实现,再做重构
//
// - 检查所有的容错处理,是否会出现
// - 配置项
// - web服务
// - 清理文件

// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/incorporating_ads_into_a_playlist
// https://developer.apple.com/documentation/http_live_streaming/example_playlists_for_http_live_streaming/event_playlist_construction
// #EXTM3U // 固定串
// #EXT-X-VERSION:3 // 固定串
// #EXT-X-MEDIA-SEQUENCE // m3u8文件中,第一个TS文件的序号
Expand All @@ -25,7 +31,8 @@ package hls

// 重构时,需要统一项目中数据的命名,比如,进来的数据称为Frame帧,188字节的封装称为TSPacket包,TS文件称为Fragment

var FixedTSHeader = []byte{
// 每个TS文件都以固定的PAT,PMT开始
var FixedFragmentHeader = []byte{
/* TS */
0x47, 0x40, 0x00, 0x10, 0x00,
/* PSI */
Expand Down Expand Up @@ -61,8 +68,8 @@ var FixedTSHeader = []byte{
/* PMT */
0xe1, 0x00,
0xf0, 0x00,
0x1b, 0xe1, 0x00, 0xf0, 0x00, /* h264 epid 256 */
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
0x1b, 0xe1, 0x00, 0xf0, 0x00, /* avc epid 256 */
0x0f, 0xe1, 0x01, 0xf0, 0x00, /* aac epid 257 */
/* CRC */
0x2f, 0x44, 0xb9, 0x9b, /* crc for aac */
/* stuffing 157 bytes */
Expand Down Expand Up @@ -104,7 +111,9 @@ const (

PidPAT uint16 = 0

// ------------------------------------------
// <iso13818-1.pdf> <Table 2-5> <page 38/174>
// ------------------------------------------
AdaptationFieldControlReserved uint8 = 0 // Reserved for future use by ISO/IEC
AdaptationFieldControlNo uint8 = 1 // No adaptation_field, payload only
AdaptationFieldControlOnly uint8 = 2 // Adaptation_field only, no payload
Expand All @@ -113,38 +122,45 @@ const (

// PMT
const (
// -----------------------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-29 Stream type assignments> <page 66/174>
// 0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
// 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
// -----------------------------------------------------------------------------
streamTypeAAC uint8 = 0x0F
streamTypeAVC uint8 = 0x1B
)

// PES
const (
// -----------------------------------------------------------------
// <iso13818-1.pdf> <Table 2-18-Stream_id assignments> <page 52/174>
streamIDAudio uint8 = 192 // 110x xxxx
// -----------------------------------------------------------------
streamIDAudio uint8 = 192 // 110x xxxx 0xC0
streamIDVideo uint8 = 224 // 1110 xxxx

// ------------------------------
// <iso13818-1.pdf> <page 53/174>
// ------------------------------
PTSDTSFlags0 uint8 = 0 // no PTS no DTS
PTSDTSFlags1 uint8 = 1 // forbidden
PTSDTSFlags2 uint8 = 2 // only PTS
PTSDTSFlags3 uint8 = 3 // both PTS and DTS
)

const (
pidVideo uint16 = 0x100
delay uint64 = 63000 // 700 ms PCR delay
PidVideo uint16 = 0x100
PidAudio uint16 = 0x101
delay uint64 = 63000 // 700 ms PCR delay TODO chef: 具体作用?

// TODO chef 这些在配置项中提供
outPath = "/tmp/lal/hls/" // 切片文件输出目录
fraglen = 5000 // 单个TS时长,单位毫秒
maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长,强制切割新的fragment,单位毫秒 * 90
negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位毫秒 * 90
playlen = 30000 // m3u8列表时长
winfrags = playlen / fraglen // 多少个TS文件
maxAudioDelay = 300
audioBufSize = 1024 * 1024
Sync = 2
outPath = "/tmp/lal/hls/" // 切片文件输出目录
fraglen = 5000 // 单个TS时长,单位毫秒
playlen = 30000 // m3u8列表时长
maxfraglen = fraglen * 90 * 10 // 单个fragment超过这个时长,强制切割新的fragment,单位毫秒 * 90
negMaxfraglen = 1000 * 90 // 当前包时间戳回滚了,比当前fragment的首个时间戳还小,强制切割新的fragment,单位毫秒 * 90
winfrags = playlen / fraglen // 多少个TS文件
maxAudioDelay uint64 = 300 // 单位毫秒
audioBufSize = 1024 * 1024
Sync = 2
)
8 changes: 4 additions & 4 deletions pkg/hls/hls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
)

func TestParseFixedTSPacket(t *testing.T) {
h := hls.ParseTSPacketHeader(hls.FixedTSHeader)
h := hls.ParseTSPacketHeader(hls.FixedFragmentHeader)
nazalog.Debugf("%+v", h)
pat := hls.ParsePAT(hls.FixedTSHeader[5:])
pat := hls.ParsePAT(hls.FixedFragmentHeader[5:])
nazalog.Debugf("%+v", pat)

h = hls.ParseTSPacketHeader(hls.FixedTSHeader[188:])
h = hls.ParseTSPacketHeader(hls.FixedFragmentHeader[188:])
nazalog.Debugf("%+v", h)
pmt := hls.ParsePMT(hls.FixedTSHeader[188+5:])
pmt := hls.ParsePMT(hls.FixedFragmentHeader[188+5:])
nazalog.Debugf("%+v", pmt)
}
14 changes: 3 additions & 11 deletions pkg/hls/mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ type MPEGTSFrame struct {
func mpegtsOpenFile(filename string) *os.File {
fp, err := os.Create(filename)
nazalog.Assert(nil, err)
mpegtsWriteFile(fp, FixedTSHeader)
mpegtsWriteFile(fp, FixedFragmentHeader)
return fp
}

func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
nazalog.Debugf("mpegts: write frame. %+v, size=%d", frame, len(b))
//nazalog.Debugf("mpegts: pid=%d, sid=%d, pts=%d, dts=%d, key=%b, size=%d", frame.pid, frame.sid, frame.pts, frame.dts, frame.key, len(b))

wpos := 0 // 当前packet的写入位置
lpos := 0 // 当前帧的处理位置
Expand Down Expand Up @@ -111,7 +111,6 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
// PES_CRC_flag 0
// PES_extension_flag 0
// PES_header_data_length
nazalog.Debug("write PES.")
packet[wpos] = 0x00 // packet_start_code_prefix
packet[wpos+1] = 0x00 //
packet[wpos+2] = 0x01 //
Expand Down Expand Up @@ -148,6 +147,7 @@ func mpegtsWriteFrame(fp *os.File, frame *MPEGTSFrame, b []byte) {
if frame.pts != frame.dts {
mpegtsWritePTS(packet[wpos:], 1, frame.dts+delay)
wpos += 5
//nazalog.Debugf("%d %d", (frame.pts)/90, (frame.dts)/90)
}

first = false
Expand Down Expand Up @@ -239,16 +239,8 @@ func mpegtsWritePTS(out []byte, fb uint8, pts uint64) {
out[4] = uint8(val)
}

//var debugCount int

func mpegtsWriteFile(fp *os.File, b []byte) {
//nazalog.Debugf("(%d) mpegts: write %d bytes.", debugCount, len(b))
//debugCount++
//if debugCount == 60 {
// nazalog.Debugf("%s", hex.Dump(b))
//}
_, _ = fp.Write(b)
_ = fp.Sync()
}

func mpegtsCloseFile(fp *os.File) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/hls/pat.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/q191201771/naza/pkg/nazabits"
)

// ---------------------------------------------------------------------------------------------------
// Program association section
// <iso13818-1.pdf> <2.4.4.3> <page 61/174>
// table_id [8b] *
Expand All @@ -31,6 +32,7 @@ import (
// program_map_PID [13b] ** if program_number == 0 then network_PID else then program_map_PID
// --------------
// CRC_32 [32b] ****
// ---------------------------------------------------------------------------------------------------
type PAT struct {
tid uint8
ssi uint8
Expand Down Expand Up @@ -62,9 +64,9 @@ func ParsePAT(b []byte) (pat PAT) {
pat.sn = br.ReadBits8(8)
pat.lsn = br.ReadBits8(8)

len := pat.sl - 9
length := pat.sl - 9

for i := uint16(0); i < len; i += 4 {
for i := uint16(0); i < length; i += 4 {
var ppe PATProgramElement
ppe.pn = br.ReadBits16(16)
br.ReadBits8(3)
Expand Down
Loading

0 comments on commit d7e7729

Please sign in to comment.