Skip to content

Commit

Permalink
Profiling es-split + fixes for probe (#49)
Browse files Browse the repository at this point in the history
* Bytewise crc32

* Bump go-astikit to 0.30. Make crc32 generator. Remove old crc32 calculation func and corresponding tests/benchmarks.

* Replace OpenFile with Create in crc32 generator. Some minor changes

* Es-split memory and cpu profiler. Es-split discard output. Es-split muxerOut as WriteCloser. Some formatting changes in Es-split. Fix profiler in Probe.

* Fix Close() method for muxerOut.

---------

Co-authored-by: Danil Korymov <[email protected]>
  • Loading branch information
k-danil and k-danil authored Mar 17, 2023
1 parent d8a24c5 commit bf157fb
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 47 deletions.
120 changes: 76 additions & 44 deletions cmd/astits-es-split/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package main
import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"github.com/pkg/profile"
"io"
"log"
"os"
"path"
Expand All @@ -19,8 +22,39 @@ const (
)

type muxerOut struct {
f *os.File
w *bufio.Writer
name string
closer io.Closer
*bufio.Writer
}

func newMuxerOut(name string, discard bool) (*muxerOut, error) {
var w io.Writer
var c io.Closer
if !discard {
f, err := os.Create(name)
if err != nil {
return nil, err
}
name = f.Name()
c = f
w = f
} else {
name += " --discard--"
w = io.Discard
}
return &muxerOut{name, c, bufio.NewWriterSize(w, ioBufSize)}, nil
}

func (m *muxerOut) Close() error {
if err := m.Flush(); err != nil {
log.Printf("Error flushing %s: %v", m.name, err)
}
if m.closer != nil {
if err := m.closer.Close(); err != nil {
return fmt.Errorf("error closing %s: %w", m.name, err)
}
}
return nil
}

func main() {
Expand All @@ -29,23 +63,34 @@ func main() {
fmt.Fprintf(flag.CommandLine.Output(), "%s INPUT_FILE [FLAGS]:\n", os.Args[0])
flag.PrintDefaults()
}

memoryProfiling := flag.Bool("mp", false, "if yes, memory profiling is enabled")
cpuProfiling := flag.Bool("cp", false, "if yes, cpu profiling is enabled")
discard := flag.Bool("discard", false, "if yes, output will be passed to discard (profiling/debug only)")
outDir := flag.String("o", "out", "Output dir, 'out' by default")
inputFile := astikit.FlagCmd()
flag.Parse()

if *cpuProfiling {
defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop()
} else if *memoryProfiling {
defer profile.Start(profile.MemProfile, profile.ProfilePath(".")).Stop()
}

infile, err := os.Open(inputFile)
if err != nil {
log.Fatalf("%v", err)
}
defer infile.Close()

_, err = os.Stat(*outDir)
if !os.IsNotExist(err) {
log.Fatalf("can't write to `%s': already exists", *outDir)
}
if !*discard {
if _, err = os.Stat(*outDir); !os.IsNotExist(err) {
log.Fatalf("can't write to '%s': already exists", *outDir)
}

if err = os.MkdirAll(*outDir, os.ModePerm); err != nil {
log.Fatalf("%v", err)
if err = os.MkdirAll(*outDir, os.ModePerm); err != nil {
log.Fatalf("%v", err)
}
}

demux := astits.NewDemuxer(
Expand All @@ -59,17 +104,16 @@ func main() {
gotAllPMTs := false
// key is pid
muxers := map[uint16]*astits.Muxer{}
outfiles := map[uint16]muxerOut{}

pmtsPrinted := false

timeStarted := time.Now()
bytesWritten := 0

var d *astits.DemuxerData
for {
d, err := demux.NextData()
if err != nil {
if err == astits.ErrNoMorePackets {
if d, err = demux.NextData(); err != nil {
if errors.Is(err, astits.ErrNoMorePackets) {
break
}
log.Fatalf("%v", err)
Expand All @@ -86,8 +130,7 @@ func main() {

gotAllPMTs = true
for _, p := range pat.Programs {
_, ok := pmts[p.ProgramNumber]
if !ok {
if _, ok := pmts[p.ProgramNumber]; !ok {
gotAllPMTs = false
break
}
Expand All @@ -105,29 +148,26 @@ func main() {
log.Printf("\tProgram %d PCR PID %d", pmt.ProgramNumber, pmt.PCRPID)
}
for _, es := range pmt.ElementaryStreams {
_, ok := muxers[es.ElementaryPID]
if ok {
if _, ok := muxers[es.ElementaryPID]; ok {
continue
}

esFilename := path.Join(*outDir, fmt.Sprintf("%d.ts", es.ElementaryPID))
outfile, err := os.Create(esFilename)
if err != nil {
var outWriter *muxerOut
if outWriter, err = newMuxerOut(esFilename, *discard); err != nil {
log.Fatalf("%v", err)
}

bufWriter := bufio.NewWriterSize(outfile, ioBufSize)
mux := astits.NewMuxer(context.Background(), bufWriter)
err = mux.AddElementaryStream(*es)
if err != nil {
defer func() {
if err = outWriter.Close(); err != nil {
log.Print(err)
}
}()

mux := astits.NewMuxer(context.Background(), outWriter)
if err = mux.AddElementaryStream(*es); err != nil {
log.Fatalf("%v", err)
}
mux.SetPCRPID(es.ElementaryPID)

outfiles[es.ElementaryPID] = muxerOut{
f: outfile,
w: bufWriter,
}
muxers[es.ElementaryPID] = mux

if !pmtsPrinted {
Expand Down Expand Up @@ -164,10 +204,11 @@ func main() {
}

var pcr *astits.ClockReference
if d.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorBothPresent {
pcr = d.PES.Header.OptionalHeader.DTS
} else if d.PES.Header.OptionalHeader.PTSDTSIndicator == astits.PTSDTSIndicatorOnlyPTS {
switch d.PES.Header.OptionalHeader.PTSDTSIndicator {
case astits.PTSDTSIndicatorOnlyPTS:
pcr = d.PES.Header.OptionalHeader.PTS
case astits.PTSDTSIndicatorBothPresent:
pcr = d.PES.Header.OptionalHeader.DTS
}

if pcr != nil {
Expand All @@ -178,29 +219,20 @@ func main() {
af.PCR = pcr
}

n, err := mux.WriteData(&astits.MuxerData{
var written int
if written, err = mux.WriteData(&astits.MuxerData{
PID: pid,
AdaptationField: af,
PES: d.PES,
})
if err != nil {
}); err != nil {
log.Fatalf("%v", err)
}

bytesWritten += n
bytesWritten += written
}

timeDiff := time.Since(timeStarted)
log.Printf("%d bytes written at rate %.02f mb/s", bytesWritten, (float64(bytesWritten)/1024.0/1024.0)/timeDiff.Seconds())

for _, f := range outfiles {
if err = f.w.Flush(); err != nil {
log.Printf("Error flushing %s: %v", f.f.Name(), err)
}
if err = f.f.Close(); err != nil {
log.Printf("Error closing %s: %v", f.f.Name(), err)
}
}

log.Printf("Done")
}
12 changes: 9 additions & 3 deletions cmd/astits-probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,24 @@ func main() {
case "data":
// Fetch data
if err = data(dmx); err != nil {
log.Fatal(fmt.Errorf("astits: fetching data failed: %w", err))
if !errors.Is(err, astits.ErrNoMorePackets) {
log.Fatal(fmt.Errorf("astits: fetching data failed: %w", err))
}
}
case "packets":
// Fetch packets
if err = packets(dmx); err != nil {
log.Fatal(fmt.Errorf("astits: fetching packets failed: %w", err))
if !errors.Is(err, astits.ErrNoMorePackets) {
log.Fatal(fmt.Errorf("astits: fetching packets failed: %w", err))
}
}
default:
// Fetch the programs
var pgms []*Program
if pgms, err = programs(dmx); err != nil {
log.Fatal(fmt.Errorf("astits: fetching programs failed: %w", err))
if !errors.Is(err, astits.ErrNoMorePackets) {
log.Fatal(fmt.Errorf("astits: fetching programs failed: %w", err))
}
}

// Print
Expand Down

0 comments on commit bf157fb

Please sign in to comment.