Skip to content

Commit

Permalink
issue ossrs#222: fix audio stream stutter btween ovelay TS file;
Browse files Browse the repository at this point in the history
1. let continuity_counter continually between overlay ts;
2. copy audio data when transcoding ts;
  • Loading branch information
suzp1984 committed Nov 25, 2024
1 parent 9b475d9 commit abdcf82
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 2 deletions.
33 changes: 31 additions & 2 deletions platform/transcript.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ossrs/go-oryx-lib/errors"
ohttp "github.com/ossrs/go-oryx-lib/http"
"github.com/ossrs/go-oryx-lib/logger"

// Use v8 because we use Go 1.16+, while v9 requires Go 1.18+
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
Expand Down Expand Up @@ -1295,6 +1296,9 @@ type TranscriptSegment struct {
// Whether user clear the ASR text of this segment.
UserClearASR bool `json:"uca,omitempty"`

// The map that host the pid -> last cc
OverlayTSLastCC map[uint16]uint8 `json:"overlay_ts_last_cc,omitempty"`

// The cost to transcode the TS file to audio file.
CostExtractAudio time.Duration `json:"eac,omitempty"`
// The cost to do ASR, converting speech to text.
Expand Down Expand Up @@ -1402,6 +1406,16 @@ func (v *TranscriptQueue) first() *TranscriptSegment {
return v.Segments[0]
}

func (v *TranscriptQueue) findBy(seqNo uint64) *TranscriptSegment {
for i := len(v.Segments) - 1; i >= 0; i-- {
if v.Segments[i].OverlayFile.SeqNo == seqNo {
return v.Segments[i]
}
}

return nil
}

func (v *TranscriptQueue) clearSubtitle(tsid string) error {
v.lock.Lock()
defer v.lock.Unlock()
Expand Down Expand Up @@ -1594,7 +1608,7 @@ func (v *TranscriptTask) OnTsSegment(ctx context.Context, msg *SrsOnHlsObject) e
func() {
// We must not update the queue, when persistence goroutine is working.
v.lock.Lock()
v.lock.Unlock()
defer v.lock.Unlock()

v.LiveQueue.enqueue(&TranscriptSegment{
Msg: msg.Msg,
Expand Down Expand Up @@ -1978,7 +1992,7 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error {
args = append(args, strings.Fields(videoCodecParams)...)
// Generate other parameters for FFmpeg.
args = append(args, []string{
"-c:a", "aac",
"-c:a", "copy",
"-copyts", // To keep the pts not changed.
"-y", overlayFile.File,
}...)
Expand All @@ -2004,6 +2018,21 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error {
}
overlayFile.Size = uint64(stats.Size())

// recaculate the continuity_counter of overlayFile
// 1. get previous segment in overlayQueue
// 2. adjust current ts segment's continuity_counter
// 2. change segment.OverlayTSLastCC
previousTSCC := map[uint16]uint8{}
if previousSegment := v.OverlayQueue.findBy(overlayFile.SeqNo - 1); previousSegment != nil {
previousTSCC = previousSegment.OverlayTSLastCC
}

if cc, err := overlayFile.AdjustCC(previousTSCC); err != nil {
logger.Wf(ctx, "Error when Adjust Overlay TS file %v", overlayFile.File)
} else {
segment.OverlayTSLastCC = cc
}

// Dequeue the segment from live queue and attach to asr queue.
func() {
v.lock.Lock()
Expand Down
91 changes: 91 additions & 0 deletions platform/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,97 @@ func (v *TsFile) String() string {
)
}

// Adjust ts packet's continuity_counter by previous ts segment.
func (v *TsFile) AdjustCC(previousTSCC map[uint16]uint8) (map[uint16]uint8, error) {
const BUF_SIZE = 188
const TS_VIDEO_AVC_PID = 256
const TS_AUDIO_AAC_PID = 257
const TS_AUDIO_MP3_PID = 258

tmpFileName := v.File + ".tmp"

err := func() error {
file, err := os.Open(v.File)
if err != nil {
return err
}
defer file.Close()

tmpFile, err := os.Create(tmpFileName)
if err != nil {
return err
}
defer tmpFile.Close()

buffer := make([]byte, BUF_SIZE)

for {
bytesRead, err := file.Read(buffer)
if err != nil {
if err == io.EOF {
return nil
}

return err
}

if bytesRead != BUF_SIZE {
return errors.Errorf("Read TS packet with size %v", bytesRead)
}

if syncByte := buffer[0]; syncByte != 0x47 {
return errors.Errorf("TS packet sync byte is not 0x47")
}

// transport_error_indicator := uint8(buffer[1] & 0x80 >> 7)
// payload_unit_start_indicator := uint8(buffer[1] & 0x40 >> 6)
// transport_priority := uint8(buffer[1] & 0x20 >> 5)
pid := uint16(buffer[1]&0x1f)<<8 + uint16(buffer[2])
// transport_scrambling_control := uint8(buffer[3] & 0xC0 >> 6)
adaptationFieldControl := uint8(buffer[3] & 0x30 >> 4)
continuityCounter := uint8(buffer[3] & 0x0f)

// check whether ts packet has payload: adaptationFieldControl == 01 or 11
hasPayload := (adaptationFieldControl & 0x01) == 1
isAVPid := pid == TS_VIDEO_AVC_PID || pid == TS_AUDIO_AAC_PID || pid == TS_AUDIO_MP3_PID
if hasPayload && isAVPid {
if counter, hasKey := previousTSCC[pid]; hasKey {
if continuityCounter != counter+1 {
continuityCounter = counter + 1
if continuityCounter > 15 {
continuityCounter = 0
}

buffer[3] = (buffer[3] & 0xf0) | continuityCounter
}

previousTSCC[pid] = continuityCounter
} else {
previousTSCC[pid] = continuityCounter
}
}

if _, err := tmpFile.Write(buffer); err != nil {
return err
}
}
}()

if err != nil {
return nil, err
}

if _, err := os.Stat(tmpFileName); err != nil {
return nil, err
}

if err := os.Rename(tmpFileName, v.File); err != nil {
return nil, err
}

return previousTSCC, nil
}

// M3u8VoDArtifact is a HLS VoD object. Because each Dvr/Vod/RecordM3u8Stream might be DVR to many VoD file,
// each is an M3u8VoDArtifact. For example, when user publish live/livestream, there is a Dvr/Vod/RecordM3u8Stream and
// M3u8VoDArtifact, then user unpublish stream and after some seconds a VoD file is generated by M3u8VoDArtifact. Then
Expand Down

0 comments on commit abdcf82

Please sign in to comment.