Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add muxer #16

Merged
merged 57 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d07a182
starting
Jan 25, 2021
0798d32
pcr write
Jan 27, 2021
c9e1bff
go-astikit replacement until PR is merged
Jan 28, 2021
9c4fa08
packet header, adaptation field, dts write
Jan 28, 2021
8741f0c
dependency cleanup
Feb 1, 2021
905064d
get rid of panic/recover for writing
Feb 2, 2021
f298e1e
make writePCR code more adequate
Feb 2, 2021
5c71b25
writePacket and adaptation field length calculation
Feb 2, 2021
278fce9
writePacket and adaptation field length calculation
Feb 2, 2021
4047b66
s/188/MpegTsPacketSize/
Feb 2, 2021
7dc14a4
writePATSection
Feb 2, 2021
28edb16
Try* -> BitsWriterBatch
Feb 3, 2021
b4a87e9
descriptors WIP
Feb 3, 2021
e7a5601
go-astikit version bump
Feb 3, 2021
3dae8fc
descriptors WIP
Feb 4, 2021
b06993f
descriptor parsing tests refactored to allow reuse for descriptor wri…
Feb 5, 2021
3e5341d
descriptors writing tested
Feb 5, 2021
8aa5b6c
descriptors writing refactoring
Feb 5, 2021
e4788ba
descriptors done, PMT WIP
Feb 5, 2021
28a686e
write PMT section fix
Feb 11, 2021
fc82353
writePSIData works
Feb 12, 2021
f82bc07
WIP
Feb 17, 2021
db031e6
PES WIP
Feb 18, 2021
05b66f0
PES functions pass tests
Feb 19, 2021
705a8d3
minor fix
Feb 20, 2021
9391d51
muxer: pat & pmt
Feb 26, 2021
78bebcd
muxer: more tests
Feb 26, 2021
2475c9b
muxer: payload writing
Feb 27, 2021
05cec28
es-split WIP
Feb 27, 2021
69ec742
es-split seems to work
Feb 28, 2021
f060240
es-split PCR sync; some style fixes
Mar 2, 2021
9191588
Data.Kind cleanup
Mar 2, 2021
074bd9b
comment update
Mar 2, 2021
881e1c3
cleanup
Mar 2, 2021
de205b4
cleanup
Mar 2, 2021
ca51287
go-astikit dep replace removed
Mar 3, 2021
b07932b
comment fix
Mar 3, 2021
e98cd9e
minor fix
Mar 3, 2021
583de87
flush on pid change removed as it seems to be unnecessary
Mar 3, 2021
449222b
added some streamtype info funcs
Mar 4, 2021
3916b58
StreamType and PSITableTypeID are special types now
Mar 5, 2021
b069beb
comment cleanup
Mar 6, 2021
72ce363
use PSITableTypeId more instead of comparing strings
Mar 6, 2021
40fd6af
comment cleanup
Mar 6, 2021
6b7966e
PSITableTypeId -> PSITableTypeID
Mar 9, 2021
a6249bb
PESIsVideoStreamId -> PESIsVideoStreamID
Mar 9, 2021
5064bcb
PSITableTypeID.String() -> PSITableTypeID.Type()
Mar 9, 2021
2abf745
review fixes: first pack: constants and stuff
Mar 22, 2021
5d7277f
packet_buffer read() renamed to peek()
Mar 23, 2021
6af7b0b
tools are moved to cmd directory
Mar 24, 2021
a5c45f3
correct prefixes for muxer and demuxer opts
Mar 24, 2021
18725af
SetPCRPID instead of isPCRPID of AddElementaryStream
Mar 24, 2021
432588a
test fix and some comments
Mar 24, 2021
135fae5
muxer WritePacket export
Mar 24, 2021
f2a492a
`astits.New` renamed to `NewDemuxer`
Mar 24, 2021
e943295
astikit version bump; writeBytesN removed in favor of one in BitsWriter
Mar 24, 2021
bc4c488
WritePayload -> WriteData with MuxerData
Mar 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions cmd/astits-es-split/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package main

import (
"bufio"
"context"
"flag"
"fmt"
"github.com/asticode/go-astikit"
"github.com/asticode/go-astits"
"log"
"os"
"path"
"time"
)

const (
ioBufSize = 10 * 1024 * 1024
)

type muxerOut struct {
f *os.File
w *bufio.Writer
}

func main() {
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Split TS file into multiple files each holding one elementary stream")
fmt.Fprintf(flag.CommandLine.Output(), "%s [FLAGS] INPUT_FILE:\n", os.Args[0])
flag.PrintDefaults()
}
outDir := flag.String("o", "out", "Output dir, 'out' by default")
inputFile := astikit.FlagCmd()
flag.Parse()

infile, err := os.Open(inputFile)
if err != nil {
log.Fatalf("%v", err)
}
defer infile.Close()

_, err = os.Stat(*outDir)
if !os.IsNotExist(err) {
log.Fatalf("can't write to `%s': already exists", *outDir)
}

if err = os.MkdirAll(*outDir, os.ModePerm); err != nil {
log.Fatalf("%v", err)
}

demux := astits.NewDemuxer(
context.Background(),
bufio.NewReaderSize(infile, ioBufSize),
)

var pat *astits.PATData
// key is program number
pmts := map[uint16]*astits.PMTData{}
gotAllPMTs := false
// key is pid
muxers := map[uint16]*astits.Muxer{}
outfiles := map[uint16]muxerOut{}

pmtsPrinted := false

timeStarted := time.Now()
bytesWritten := 0

for {
d, err := demux.NextData()
if err != nil {
if err == astits.ErrNoMorePackets {
break
}
log.Fatalf("%v", err)
}

if d.PAT != nil {
pat = d.PAT
gotAllPMTs = false
continue
}

if d.PMT != nil {
pmts[d.PMT.ProgramNumber] = d.PMT

gotAllPMTs = true
for _, p := range pat.Programs {
_, ok := pmts[p.ProgramNumber]
if !ok {
gotAllPMTs = false
break
}
}

if !gotAllPMTs {
continue
}

if !pmtsPrinted {
log.Printf("Got all PMTs")
}
for _, pmt := range pmts {
if !pmtsPrinted {
log.Printf("\tProgram %d PCR PID %d", pmt.ProgramNumber, pmt.PCRPID)
}
for _, es := range pmt.ElementaryStreams {
_, ok := muxers[es.ElementaryPID]
if ok {
continue
}

esFilename := path.Join(*outDir, fmt.Sprintf("%d.ts", es.ElementaryPID))
outfile, err := os.Create(esFilename)
if err != nil {
log.Fatalf("%v", err)
}

bufWriter := bufio.NewWriterSize(outfile, ioBufSize)
mux := astits.NewMuxer(context.Background(), bufWriter)
err = mux.AddElementaryStream(*es)
if err != nil {
log.Fatalf("%v", err)
}
mux.SetPCRPID(es.ElementaryPID)

outfiles[es.ElementaryPID] = muxerOut{
f: outfile,
w: bufWriter,
}
muxers[es.ElementaryPID] = mux

if !pmtsPrinted {
log.Printf("\t\tES PID %d type %s",
es.ElementaryPID, es.StreamType.String(),
)
}
}
}

pmtsPrinted = true
continue
}

if !gotAllPMTs {
continue
}

if d.PES == nil {
continue
}

pid := d.FirstPacket.Header.PID
mux, ok := muxers[pid]
if !ok {
log.Printf("Got payload for unknown PID %d", pid)
continue
}

af := d.FirstPacket.AdaptationField

if af != nil && af.HasPCR {
af.HasPCR = false
}

var pcr *astits.ClockReference
if d.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent {
pcr = d.PES.Header.OptionalHeader.DTS
} else if d.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorOnlyPTS {
pcr = d.PES.Header.OptionalHeader.PTS
}

if pcr != nil {
if af == nil {
af = &astits.PacketAdaptationField{}
}
af.HasPCR = true
af.PCR = pcr
}

n, err := mux.WriteData(&astits.MuxerData{
PID: pid,
AdaptationField: af,
PES: d.PES,
})
if err != nil {
log.Fatalf("%v", err)
}

bytesWritten += n
}

timeDiff := time.Since(timeStarted)
log.Printf("%d bytes written at rate %.02f mb/s", bytesWritten, (float64(bytesWritten)/1024.0/1024.0)/timeDiff.Seconds())

for _, f := range outfiles {
if err = f.w.Flush(); err != nil {
log.Printf("Error flushing %s: %v", f.f.Name(), err)
}
if err = f.f.Close(); err != nil {
log.Printf("Error closing %s: %v", f.f.Name(), err)
}
}

log.Printf("Done")
}
14 changes: 7 additions & 7 deletions astits/main.go → cmd/astits-probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
}

// Create the demuxer
var dmx = astits.New(ctx, r)
var dmx = astits.NewDemuxer(ctx, r)

// Switch on command
switch cmd {
Expand Down Expand Up @@ -219,7 +219,7 @@ func data(dmx *astits.Demuxer) (err error) {
}

// Loop through data
var d *astits.Data
var d *astits.DemuxerData
log.Println("Fetching data...")
for {
// Get next data
Expand Down Expand Up @@ -272,7 +272,7 @@ func data(dmx *astits.Demuxer) (err error) {

func programs(dmx *astits.Demuxer) (o []*Program, err error) {
// Loop through data
var d *astits.Data
var d *astits.DemuxerData
var pgmsToProcess = make(map[uint16]bool)
var pgms = make(map[uint16]*Program)
log.Println("Fetching data...")
Expand Down Expand Up @@ -347,9 +347,9 @@ type Program struct {

// Stream represents a stream
type Stream struct {
Descriptors []string `json:"descriptors,omitempty"`
ID uint16 `json:"id,omitempty"`
Type uint8 `json:"type,omitempty"`
Descriptors []string `json:"descriptors,omitempty"`
ID uint16 `json:"id,omitempty"`
Type astits.StreamType `json:"type,omitempty"`
}

func newProgram(id, mapID uint16) *Program {
Expand All @@ -359,7 +359,7 @@ func newProgram(id, mapID uint16) *Program {
}
}

func newStream(id uint16, _type uint8) *Stream {
func newStream(id uint16, _type astits.StreamType) *Stream {
return &Stream{
ID: id,
Type: _type,
Expand Down
25 changes: 25 additions & 0 deletions crc32.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package astits

const (
crc32Polynomial = uint32(0xffffffff)
)

// computeCRC32 computes a CRC32
// https://stackoverflow.com/questions/35034042/how-to-calculate-crc32-in-psi-si-packet
func computeCRC32(bs []byte) uint32 {
return updateCRC32(crc32Polynomial, bs)
}

func updateCRC32(crc32 uint32, bs []byte) uint32 {
for _, b := range bs {
for i := 0; i < 8; i++ {
if (crc32 >= uint32(0x80000000)) != (b >= uint8(0x80)) {
crc32 = (crc32 << 1) ^ 0x04C11DB7
} else {
crc32 = crc32 << 1
}
b <<= 1
}
}
return crc32
}
23 changes: 15 additions & 8 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (

// PIDs
const (
PIDPAT = 0x0 // Program Association Table (PAT) contains a directory listing of all Program Map Tables.
PIDCAT = 0x1 // Conditional Access Table (CAT) contains a directory listing of all ITU-T Rec. H.222 entitlement management message streams used by Program Map Tables.
PIDTSDT = 0x2 // Transport Stream Description Table (TSDT) contains descriptors related to the overall transport stream
PIDNull = 0x1fff // Null Packet (used for fixed bandwidth padding)
PIDPAT uint16 = 0x0 // Program Association Table (PAT) contains a directory listing of all Program Map Tables.
PIDCAT uint16 = 0x1 // Conditional Access Table (CAT) contains a directory listing of all ITU-T Rec. H.222 entitlement management message streams used by Program Map Tables.
PIDTSDT uint16 = 0x2 // Transport Stream Description Table (TSDT) contains descriptors related to the overall transport stream
PIDNull uint16 = 0x1fff // Null Packet (used for fixed bandwidth padding)
)

// Data represents a data
type Data struct {
// DemuxerData represents a data parsed by Demuxer
type DemuxerData struct {
EIT *EITData
FirstPacket *Packet
NIT *NITData
Expand All @@ -27,8 +27,15 @@ type Data struct {
TOT *TOTData
}

// MuxerData represents a data to be written by Muxer
type MuxerData struct {
PID uint16
AdaptationField *PacketAdaptationField
PES *PESData
}

// parseData parses a payload spanning over multiple packets and returns a set of data
func parseData(ps []*Packet, prs PacketsParser, pm programMap) (ds []*Data, err error) {
func parseData(ps []*Packet, prs PacketsParser, pm programMap) (ds []*DemuxerData, err error) {
// Use custom parser first
if prs != nil {
var skip bool
Expand Down Expand Up @@ -82,7 +89,7 @@ func parseData(ps []*Packet, prs PacketsParser, pm programMap) (ds []*Data, err
}

// Append data
ds = append(ds, &Data{
ds = append(ds, &DemuxerData{
FirstPacket: ps[0],
PES: pesData,
PID: pid,
Expand Down
1 change: 1 addition & 0 deletions data_eit.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

// EITData represents an EIT data
// Page: 36 | Chapter: 5.2.4 | Link: https://www.dvb.org/resources/public/standards/a38_dvb-si_specification.pdf
// (barbashov) the link above can be broken, alternative: https://dvb.org/wp-content/uploads/2019/12/a038_tm1217r37_en300468v1_17_1_-_rev-134_-_si_specification.pdf
type EITData struct {
Events []*EITDataEvent
LastTableID uint8
Expand Down
1 change: 1 addition & 0 deletions data_nit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

// NITData represents a NIT data
// Page: 29 | Chapter: 5.2.1 | Link: https://www.dvb.org/resources/public/standards/a38_dvb-si_specification.pdf
// (barbashov) the link above can be broken, alternative: https://dvb.org/wp-content/uploads/2019/12/a038_tm1217r37_en300468v1_17_1_-_rev-134_-_si_specification.pdf
type NITData struct {
NetworkDescriptors []*Descriptor
NetworkID uint16
Expand Down
20 changes: 20 additions & 0 deletions data_pat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"github.com/asticode/go-astikit"
)

const (
patSectionEntryBytesSize = 4 // 16 bits + 3 reserved + 13 bits = 32 bits
)

// PATData represents a PAT data
// https://en.wikipedia.org/wiki/Program-specific_information
type PATData struct {
Expand Down Expand Up @@ -41,3 +45,19 @@ func parsePATSection(i *astikit.BytesIterator, offsetSectionsEnd int, tableIDExt
}
return
}

func calcPATSectionLength(d *PATData) uint16 {
return uint16(4 * len(d.Programs))
}

func writePATSection(w *astikit.BitsWriter, d *PATData) (int, error) {
b := astikit.NewBitsWriterBatch(w)

for _, p := range d.Programs {
b.Write(p.ProgramNumber)
b.WriteN(uint8(0xff), 3)
b.WriteN(p.ProgramMapID, 13)
}

return len(d.Programs) * patSectionEntryBytesSize, b.Err()
}
10 changes: 10 additions & 0 deletions data_pat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ func TestParsePATSection(t *testing.T) {
assert.Equal(t, d, pat)
assert.NoError(t, err)
}

func TestWritePatSection(t *testing.T) {
bw := &bytes.Buffer{}
w := astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: bw})
n, err := writePATSection(w, pat)
assert.NoError(t, err)
assert.Equal(t, n, 8)
assert.Equal(t, n, bw.Len())
assert.Equal(t, patBytes(), bw.Bytes())
}
Loading