diff --git a/platform/transcript.go b/platform/transcript.go index 51340915..903eff5f 100644 --- a/platform/transcript.go +++ b/platform/transcript.go @@ -21,6 +21,7 @@ import ( "github.com/ossrs/go-oryx-lib/errors" ohttp "github.com/ossrs/go-oryx-lib/http" "github.com/ossrs/go-oryx-lib/logger" + // Use v8 because we use Go 1.16+, while v9 requires Go 1.18+ "github.com/go-redis/redis/v8" "github.com/google/uuid" @@ -1295,6 +1296,9 @@ type TranscriptSegment struct { // Whether user clear the ASR text of this segment. UserClearASR bool `json:"uca,omitempty"` + // The map that host the pid -> last cc + OverlayTSLastCC map[uint16]uint8 `json:"start_cc,omitempty"` + // The cost to transcode the TS file to audio file. CostExtractAudio time.Duration `json:"eac,omitempty"` // The cost to do ASR, converting speech to text. @@ -1402,6 +1406,16 @@ func (v *TranscriptQueue) first() *TranscriptSegment { return v.Segments[0] } +func (v *TranscriptQueue) find_by(seq_no uint64) *TranscriptSegment { + for i := len(v.Segments) - 1; i >= 0; i-- { + if v.Segments[i].OverlayFile.SeqNo == seq_no { + return v.Segments[i] + } + } + + return nil +} + func (v *TranscriptQueue) clearSubtitle(tsid string) error { v.lock.Lock() defer v.lock.Unlock() @@ -1594,7 +1608,7 @@ func (v *TranscriptTask) OnTsSegment(ctx context.Context, msg *SrsOnHlsObject) e func() { // We must not update the queue, when persistence goroutine is working. v.lock.Lock() - v.lock.Unlock() + defer v.lock.Unlock() v.LiveQueue.enqueue(&TranscriptSegment{ Msg: msg.Msg, @@ -1978,7 +1992,7 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { args = append(args, strings.Fields(videoCodecParams)...) // Generate other parameters for FFmpeg. args = append(args, []string{ - "-c:a", "aac", + "-c:a", "copy", "-copyts", // To keep the pts not changed. "-y", overlayFile.File, }...) @@ -2004,6 +2018,21 @@ func (v *TranscriptTask) DriveFixQueue(ctx context.Context) error { } overlayFile.Size = uint64(stats.Size()) + // recaculate the continuity_counter of overlayFile + // 1. get previous segment in overlayQueue + // 2. adjust current ts segment's continuity_counter + // 2. change segment.OverlayTSLastCC + previous_ts_cc := map[uint16]uint8{} + if previous_segment := v.OverlayQueue.find_by(overlayFile.SeqNo - 1); previous_segment != nil { + previous_ts_cc = previous_segment.OverlayTSLastCC + } + + if cc, err := overlayFile.AdjustCC(previous_ts_cc); err != nil { + logger.Wf(ctx, "Error when Adjust Overlay TS file %v", overlayFile.File) + } else { + segment.OverlayTSLastCC = cc + } + // Dequeue the segment from live queue and attach to asr queue. func() { v.lock.Lock() diff --git a/platform/utils.go b/platform/utils.go index c8283a7f..9010c3bf 100644 --- a/platform/utils.go +++ b/platform/utils.go @@ -1129,6 +1129,97 @@ func (v *TsFile) String() string { ) } +// Adjust ts packet's continuity_counter by previous ts segment. +func (v *TsFile) AdjustCC(cc map[uint16]uint8) (map[uint16]uint8, error) { + const BUF_SIZE = 188 + const TS_VIDEO_AVC_PID = 256 + const TS_AUDIO_AAC_PID = 257 + const TS_AUDIO_MP3_PID = 258 + + tmp_file_name := v.File + ".tmp" + + err := func() error { + file, err := os.Open(v.File) + if err != nil { + return err + } + defer file.Close() + + tmp_file, err := os.Create(tmp_file_name) + if err != nil { + return err + } + defer tmp_file.Close() + + buffer := make([]byte, BUF_SIZE) + + for { + bytes_read, err := file.Read(buffer) + if err != nil { + if err == io.EOF { + return nil + } + + return err + } + + if bytes_read != BUF_SIZE { + return errors.Errorf("Read TS packet with size %v", bytes_read) + } + + if sync_byte := buffer[0]; sync_byte != 0x47 { + return errors.Errorf("TS packet sync byte is not 0x47") + } + + // transport_error_indicator := uint8(buffer[1] & 0x80 >> 7) + // payload_unit_start_indicator := uint8(buffer[1] & 0x40 >> 6) + // transport_priority := uint8(buffer[1] & 0x20 >> 5) + pid := uint16(buffer[1]&0x1f)<<8 + uint16(buffer[2]) + // transport_scrambling_control := uint8(buffer[3] & 0xC0 >> 6) + adaptation_field_control := uint8(buffer[3] & 0x30 >> 4) + continuity_counter := uint8(buffer[3] & 0x0f) + + // check whether ts packet has payload: adaptation_field_control == 01 or 11 + has_payload := (adaptation_field_control & 0x01) == 1 + is_AV_PID := pid == TS_VIDEO_AVC_PID || pid == TS_AUDIO_AAC_PID || pid == TS_AUDIO_MP3_PID + if has_payload && is_AV_PID { + if counter, hasKey := cc[pid]; hasKey { + if continuity_counter != counter+1 { + continuity_counter = counter + 1 + if continuity_counter > 15 { + continuity_counter = 0 + } + + buffer[3] = (buffer[3] & 0xf0) | continuity_counter + } + + cc[pid] = continuity_counter + } else { + cc[pid] = continuity_counter + } + } + + if _, err := tmp_file.Write(buffer); err != nil { + return err + } + } + }() + + if err != nil { + return nil, err + } + + if _, err := os.Stat(tmp_file_name); err != nil { + return nil, err + } + + if err := os.Rename(tmp_file_name, v.File); err != nil { + return nil, err + } + + return cc, nil +} + // M3u8VoDArtifact is a HLS VoD object. Because each Dvr/Vod/RecordM3u8Stream might be DVR to many VoD file, // each is an M3u8VoDArtifact. For example, when user publish live/livestream, there is a Dvr/Vod/RecordM3u8Stream and // M3u8VoDArtifact, then user unpublish stream and after some seconds a VoD file is generated by M3u8VoDArtifact. Then