Skip to content

Commit

Permalink
Add muxer (#16)
Browse files Browse the repository at this point in the history
* starting

* pcr write

* go-astikit replacement until PR is merged

* packet header, adaptation field, dts write

* dependency cleanup

* get rid of panic/recover for writing

* make writePCR code more adequate

* writePacket and adaptation field length calculation

* writePacket and adaptation field length calculation

* s/188/MpegTsPacketSize/

* writePATSection

* Try* -> BitsWriterBatch

* descriptors WIP

* go-astikit version bump

* descriptors WIP

* descriptor parsing tests refactored to allow reuse for descriptor writing

* descriptors writing tested

* descriptors writing refactoring

* descriptors done, PMT WIP

* write PMT section fix

* writePSIData works

* WIP

* PES WIP

* PES functions pass tests

* minor fix

* muxer: pat & pmt

* muxer: more tests

* muxer: payload writing

* es-split WIP

* es-split seems to work

* es-split PCR sync; some style fixes

* Data.Kind cleanup

* comment update

* cleanup

* cleanup

* go-astikit dep replace removed

* comment fix

* minor fix

* flush on pid change removed as it seems to be unnecessary

* added some streamtype info funcs

* StreamType and PSITableTypeID are special types now

* comment cleanup

* use PSITableTypeId more instead of comparing strings

* comment cleanup

* PSITableTypeId -> PSITableTypeID

* PESIsVideoStreamId -> PESIsVideoStreamID

* PSITableTypeID.String() -> PSITableTypeID.Type()

* review fixes: first pack: constants and stuff

* packet_buffer read() renamed to peek()

* tools are moved to cmd directory

* correct prefixes for muxer and demuxer opts

* SetPCRPID instead of isPCRPID of AddElementaryStream

* test fix and some comments

* muxer WritePacket export

* `astits.New` renamed to `NewDemuxer`

* astikit version bump; writeBytesN removed in favor of one in BitsWriter

* WritePayload -> WriteData with MuxerData

Co-authored-by: Ilya Barbashov <[email protected]>
  • Loading branch information
barbashov and Ilya Barbashov authored Mar 24, 2021
1 parent 35d9da8 commit 5caf397
Show file tree
Hide file tree
Showing 34 changed files with 4,223 additions and 704 deletions.
205 changes: 205 additions & 0 deletions cmd/astits-es-split/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package main

import (
"bufio"
"context"
"flag"
"fmt"
"github.com/asticode/go-astikit"
"github.com/asticode/go-astits"
"log"
"os"
"path"
"time"
)

const (
ioBufSize = 10 * 1024 * 1024
)

type muxerOut struct {
f *os.File
w *bufio.Writer
}

func main() {
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Split TS file into multiple files each holding one elementary stream")
fmt.Fprintf(flag.CommandLine.Output(), "%s [FLAGS] INPUT_FILE:\n", os.Args[0])
flag.PrintDefaults()
}
outDir := flag.String("o", "out", "Output dir, 'out' by default")
inputFile := astikit.FlagCmd()
flag.Parse()

infile, err := os.Open(inputFile)
if err != nil {
log.Fatalf("%v", err)
}
defer infile.Close()

_, err = os.Stat(*outDir)
if !os.IsNotExist(err) {
log.Fatalf("can't write to `%s': already exists", *outDir)
}

if err = os.MkdirAll(*outDir, os.ModePerm); err != nil {
log.Fatalf("%v", err)
}

demux := astits.NewDemuxer(
context.Background(),
bufio.NewReaderSize(infile, ioBufSize),
)

var pat *astits.PATData
// key is program number
pmts := map[uint16]*astits.PMTData{}
gotAllPMTs := false
// key is pid
muxers := map[uint16]*astits.Muxer{}
outfiles := map[uint16]muxerOut{}

pmtsPrinted := false

timeStarted := time.Now()
bytesWritten := 0

for {
d, err := demux.NextData()
if err != nil {
if err == astits.ErrNoMorePackets {
break
}
log.Fatalf("%v", err)
}

if d.PAT != nil {
pat = d.PAT
gotAllPMTs = false
continue
}

if d.PMT != nil {
pmts[d.PMT.ProgramNumber] = d.PMT

gotAllPMTs = true
for _, p := range pat.Programs {
_, ok := pmts[p.ProgramNumber]
if !ok {
gotAllPMTs = false
break
}
}

if !gotAllPMTs {
continue
}

if !pmtsPrinted {
log.Printf("Got all PMTs")
}
for _, pmt := range pmts {
if !pmtsPrinted {
log.Printf("\tProgram %d PCR PID %d", pmt.ProgramNumber, pmt.PCRPID)
}
for _, es := range pmt.ElementaryStreams {
_, ok := muxers[es.ElementaryPID]
if ok {
continue
}

esFilename := path.Join(*outDir, fmt.Sprintf("%d.ts", es.ElementaryPID))
outfile, err := os.Create(esFilename)
if err != nil {
log.Fatalf("%v", err)
}

bufWriter := bufio.NewWriterSize(outfile, ioBufSize)
mux := astits.NewMuxer(context.Background(), bufWriter)
err = mux.AddElementaryStream(*es)
if err != nil {
log.Fatalf("%v", err)
}
mux.SetPCRPID(es.ElementaryPID)

outfiles[es.ElementaryPID] = muxerOut{
f: outfile,
w: bufWriter,
}
muxers[es.ElementaryPID] = mux

if !pmtsPrinted {
log.Printf("\t\tES PID %d type %s",
es.ElementaryPID, es.StreamType.String(),
)
}
}
}

pmtsPrinted = true
continue
}

if !gotAllPMTs {
continue
}

if d.PES == nil {
continue
}

pid := d.FirstPacket.Header.PID
mux, ok := muxers[pid]
if !ok {
log.Printf("Got payload for unknown PID %d", pid)
continue
}

af := d.FirstPacket.AdaptationField

if af != nil && af.HasPCR {
af.HasPCR = false
}

var pcr *astits.ClockReference
if d.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent {
pcr = d.PES.Header.OptionalHeader.DTS
} else if d.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorOnlyPTS {
pcr = d.PES.Header.OptionalHeader.PTS
}

if pcr != nil {
if af == nil {
af = &astits.PacketAdaptationField{}
}
af.HasPCR = true
af.PCR = pcr
}

n, err := mux.WriteData(&astits.MuxerData{
PID: pid,
AdaptationField: af,
PES: d.PES,
})
if err != nil {
log.Fatalf("%v", err)
}

bytesWritten += n
}

timeDiff := time.Since(timeStarted)
log.Printf("%d bytes written at rate %.02f mb/s", bytesWritten, (float64(bytesWritten)/1024.0/1024.0)/timeDiff.Seconds())

for _, f := range outfiles {
if err = f.w.Flush(); err != nil {
log.Printf("Error flushing %s: %v", f.f.Name(), err)
}
if err = f.f.Close(); err != nil {
log.Printf("Error closing %s: %v", f.f.Name(), err)
}
}

log.Printf("Done")
}
14 changes: 7 additions & 7 deletions astits/main.go → cmd/astits-probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
}

// Create the demuxer
var dmx = astits.New(ctx, r)
var dmx = astits.NewDemuxer(ctx, r)

// Switch on command
switch cmd {
Expand Down Expand Up @@ -219,7 +219,7 @@ func data(dmx *astits.Demuxer) (err error) {
}

// Loop through data
var d *astits.Data
var d *astits.DemuxerData
log.Println("Fetching data...")
for {
// Get next data
Expand Down Expand Up @@ -272,7 +272,7 @@ func data(dmx *astits.Demuxer) (err error) {

func programs(dmx *astits.Demuxer) (o []*Program, err error) {
// Loop through data
var d *astits.Data
var d *astits.DemuxerData
var pgmsToProcess = make(map[uint16]bool)
var pgms = make(map[uint16]*Program)
log.Println("Fetching data...")
Expand Down Expand Up @@ -347,9 +347,9 @@ type Program struct {

// Stream represents a stream
type Stream struct {
Descriptors []string `json:"descriptors,omitempty"`
ID uint16 `json:"id,omitempty"`
Type uint8 `json:"type,omitempty"`
Descriptors []string `json:"descriptors,omitempty"`
ID uint16 `json:"id,omitempty"`
Type astits.StreamType `json:"type,omitempty"`
}

func newProgram(id, mapID uint16) *Program {
Expand All @@ -359,7 +359,7 @@ func newProgram(id, mapID uint16) *Program {
}
}

func newStream(id uint16, _type uint8) *Stream {
func newStream(id uint16, _type astits.StreamType) *Stream {
return &Stream{
ID: id,
Type: _type,
Expand Down
25 changes: 25 additions & 0 deletions crc32.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package astits

const (
crc32Polynomial = uint32(0xffffffff)
)

// computeCRC32 computes a CRC32
// https://stackoverflow.com/questions/35034042/how-to-calculate-crc32-in-psi-si-packet
func computeCRC32(bs []byte) uint32 {
return updateCRC32(crc32Polynomial, bs)
}

func updateCRC32(crc32 uint32, bs []byte) uint32 {
for _, b := range bs {
for i := 0; i < 8; i++ {
if (crc32 >= uint32(0x80000000)) != (b >= uint8(0x80)) {
crc32 = (crc32 << 1) ^ 0x04C11DB7
} else {
crc32 = crc32 << 1
}
b <<= 1
}
}
return crc32
}
23 changes: 15 additions & 8 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (

// PIDs
const (
PIDPAT = 0x0 // Program Association Table (PAT) contains a directory listing of all Program Map Tables.
PIDCAT = 0x1 // Conditional Access Table (CAT) contains a directory listing of all ITU-T Rec. H.222 entitlement management message streams used by Program Map Tables.
PIDTSDT = 0x2 // Transport Stream Description Table (TSDT) contains descriptors related to the overall transport stream
PIDNull = 0x1fff // Null Packet (used for fixed bandwidth padding)
PIDPAT uint16 = 0x0 // Program Association Table (PAT) contains a directory listing of all Program Map Tables.
PIDCAT uint16 = 0x1 // Conditional Access Table (CAT) contains a directory listing of all ITU-T Rec. H.222 entitlement management message streams used by Program Map Tables.
PIDTSDT uint16 = 0x2 // Transport Stream Description Table (TSDT) contains descriptors related to the overall transport stream
PIDNull uint16 = 0x1fff // Null Packet (used for fixed bandwidth padding)
)

// Data represents a data
type Data struct {
// DemuxerData represents a data parsed by Demuxer
type DemuxerData struct {
EIT *EITData
FirstPacket *Packet
NIT *NITData
Expand All @@ -27,8 +27,15 @@ type Data struct {
TOT *TOTData
}

// MuxerData represents a data to be written by Muxer
type MuxerData struct {
PID uint16
AdaptationField *PacketAdaptationField
PES *PESData
}

// parseData parses a payload spanning over multiple packets and returns a set of data
func parseData(ps []*Packet, prs PacketsParser, pm programMap) (ds []*Data, err error) {
func parseData(ps []*Packet, prs PacketsParser, pm programMap) (ds []*DemuxerData, err error) {
// Use custom parser first
if prs != nil {
var skip bool
Expand Down Expand Up @@ -82,7 +89,7 @@ func parseData(ps []*Packet, prs PacketsParser, pm programMap) (ds []*Data, err
}

// Append data
ds = append(ds, &Data{
ds = append(ds, &DemuxerData{
FirstPacket: ps[0],
PES: pesData,
PID: pid,
Expand Down
1 change: 1 addition & 0 deletions data_eit.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

// EITData represents an EIT data
// Page: 36 | Chapter: 5.2.4 | Link: https://www.dvb.org/resources/public/standards/a38_dvb-si_specification.pdf
// (barbashov) the link above can be broken, alternative: https://dvb.org/wp-content/uploads/2019/12/a038_tm1217r37_en300468v1_17_1_-_rev-134_-_si_specification.pdf
type EITData struct {
Events []*EITDataEvent
LastTableID uint8
Expand Down
1 change: 1 addition & 0 deletions data_nit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

// NITData represents a NIT data
// Page: 29 | Chapter: 5.2.1 | Link: https://www.dvb.org/resources/public/standards/a38_dvb-si_specification.pdf
// (barbashov) the link above can be broken, alternative: https://dvb.org/wp-content/uploads/2019/12/a038_tm1217r37_en300468v1_17_1_-_rev-134_-_si_specification.pdf
type NITData struct {
NetworkDescriptors []*Descriptor
NetworkID uint16
Expand Down
20 changes: 20 additions & 0 deletions data_pat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"github.com/asticode/go-astikit"
)

const (
patSectionEntryBytesSize = 4 // 16 bits + 3 reserved + 13 bits = 32 bits
)

// PATData represents a PAT data
// https://en.wikipedia.org/wiki/Program-specific_information
type PATData struct {
Expand Down Expand Up @@ -41,3 +45,19 @@ func parsePATSection(i *astikit.BytesIterator, offsetSectionsEnd int, tableIDExt
}
return
}

func calcPATSectionLength(d *PATData) uint16 {
return uint16(4 * len(d.Programs))
}

func writePATSection(w *astikit.BitsWriter, d *PATData) (int, error) {
b := astikit.NewBitsWriterBatch(w)

for _, p := range d.Programs {
b.Write(p.ProgramNumber)
b.WriteN(uint8(0xff), 3)
b.WriteN(p.ProgramMapID, 13)
}

return len(d.Programs) * patSectionEntryBytesSize, b.Err()
}
10 changes: 10 additions & 0 deletions data_pat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ func TestParsePATSection(t *testing.T) {
assert.Equal(t, d, pat)
assert.NoError(t, err)
}

func TestWritePatSection(t *testing.T) {
bw := &bytes.Buffer{}
w := astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: bw})
n, err := writePATSection(w, pat)
assert.NoError(t, err)
assert.Equal(t, n, 8)
assert.Equal(t, n, bw.Len())
assert.Equal(t, patBytes(), bw.Bytes())
}
Loading

0 comments on commit 5caf397

Please sign in to comment.