From 7ecf8fbe03a4ef257d53f802f4d765787a7dd4b4 Mon Sep 17 00:00:00 2001 From: OrlandoCo Date: Tue, 8 Jun 2021 22:17:03 -0500 Subject: [PATCH] fix(sfu): Unbind downtrack locking --- pkg/sfu/receiver.go | 21 +++++++++++++++++++-- pkg/sfu/subscriber.go | 1 + 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index f4108e009..d9db6f53e 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -45,6 +45,7 @@ type WebRTCReceiver struct { trackID string streamID string kind webrtc.RTPCodecType + closed atomicBool bandwidth uint64 lastPli int64 stream string @@ -100,8 +101,11 @@ func (w *WebRTCReceiver) Kind() webrtc.RTPCodecType { } func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buffer, bestQualityFirst bool) { - var layer int + if w.closed.get() { + return + } + var layer int switch track.RID() { case fullResolution: layer = 2 @@ -167,6 +171,10 @@ func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote, buff *buffer.Buff } func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { + if w.closed.get() { + return + } + layer := 0 if w.isSimulcast { w.Lock() @@ -205,6 +213,10 @@ func (w *WebRTCReceiver) AddDownTrack(track *DownTrack, bestQualityFirst bool) { } func (w *WebRTCReceiver) SwitchDownTrack(track *DownTrack, layer int) error { + if w.closed.get() { + return errNoReceiverFound + } + if buf := w.buffers[layer]; buf != nil { w.locks[layer].Lock() w.pendingTracks[layer] = append(w.pendingTracks[layer], track) @@ -241,6 +253,10 @@ func (w *WebRTCReceiver) OnCloseHandler(fn func()) { // DeleteDownTrack removes a DownTrack from a Receiver func (w *WebRTCReceiver) DeleteDownTrack(layer int, id string) { + if w.closed.get() { + return + } + w.locks[layer].Lock() idx := -1 for i, dt := range w.downTracks[layer] { @@ -329,7 +345,8 @@ func (w *WebRTCReceiver) RetransmitPackets(track *DownTrack, packets []packetMet func (w *WebRTCReceiver) writeRTP(layer int) { defer func() { w.closeOnce.Do(func() { - go w.closeTracks() + w.closed.set(true) + w.closeTracks() }) }() diff --git a/pkg/sfu/subscriber.go b/pkg/sfu/subscriber.go index 837dfdce6..a2a7fca5d 100644 --- a/pkg/sfu/subscriber.go +++ b/pkg/sfu/subscriber.go @@ -155,6 +155,7 @@ func (s *Subscriber) RemoveDownTrack(streamID string, downTrack *DownTrack) { for i, dt := range dts { if dt == downTrack { idx = i + break } } if idx >= 0 {