-
Notifications
You must be signed in to change notification settings - Fork 36
/
publisher.go
111 lines (105 loc) · 2.84 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package webrtc
import (
"sync/atomic"
"time"
"github.com/pion/rtcp"
. "github.com/pion/webrtc/v3"
"go.uber.org/zap"
. "m7s.live/engine/v4"
"m7s.live/engine/v4/codec"
)
type WebRTCPublisher struct {
Publisher
WebRTCIO
audioTrack atomic.Pointer[TrackRemote]
videoTrack atomic.Pointer[TrackRemote]
}
func (puber *WebRTCPublisher) OnEvent(event any) {
switch event.(type) {
case IPublisher:
puber.OnTrack(puber.onTrack)
}
puber.Publisher.OnEvent(event)
}
func (puber *WebRTCPublisher) onTrack(track *TrackRemote, receiver *RTPReceiver) {
puber.Info("onTrack", zap.String("kind", track.Kind().String()), zap.Uint8("payloadType", uint8(track.Codec().PayloadType)))
if codecP := track.Codec(); track.Kind() == RTPCodecTypeAudio {
puber.audioTrack.Store(track)
if puber.AudioTrack == nil {
switch codecP.PayloadType {
case 111:
puber.CreateAudioTrack(codec.CodecID_OPUS)
case 8:
puber.CreateAudioTrack(codec.CodecID_PCMA)
case 0:
puber.CreateAudioTrack(codec.CodecID_PCMU)
default:
puber.AudioTrack = nil
puber.Config.PubAudio = false
return
}
}
for {
if puber.audioTrack.Load() != track {
return
}
rtpItem := puber.AudioTrack.GetRTPFromPool()
if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
puber.AudioTrack.WriteRTP(rtpItem)
} else {
puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
rtpItem.Recycle()
return
}
}
} else {
puber.videoTrack.Store(track)
if puber.VideoTrack == nil {
switch codecP.PayloadType {
case 45:
puber.CreateVideoTrack(codec.CodecID_AV1, byte(codecP.PayloadType))
default:
puber.CreateVideoTrack(codec.CodecID_H264, byte(codecP.PayloadType))
}
}
go puber.writeRTCP(track)
for {
if puber.videoTrack.Load() != track {
return
}
rtpItem := puber.VideoTrack.GetRTPFromPool()
if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
if rtpItem.Value.Extension {
for _, id := range rtpItem.Value.GetExtensionIDs() {
puber.Debug("extension", zap.Uint8("id", id), zap.Binary("value", rtpItem.Value.GetExtension(id)))
}
}
puber.VideoTrack.WriteRTP(rtpItem)
} else {
puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
rtpItem.Recycle()
return
}
}
}
}
func (puber *WebRTCPublisher) writeRTCP(track *TrackRemote) {
ticker := time.NewTicker(webrtcConfig.PLI)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if puber.videoTrack.Load() != track {
return
}
if rtcpErr := puber.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil {
puber.Error("writeRTCP", zap.Error(rtcpErr))
return
}
case <-puber.Done():
return
}
}
}