Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Nack interceptors #4

Merged
merged 1 commit into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/pion/interceptor
go 1.15

require (
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.6.2
github.com/stretchr/testify v1.6.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
Expand Down
12 changes: 0 additions & 12 deletions nack.go

This file was deleted.

6 changes: 6 additions & 0 deletions pkg/nack/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package nack

import "errors"

// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied.
var ErrInvalidSize = errors.New("invalid buffer size")
158 changes: 158 additions & 0 deletions pkg/nack/generator_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package nack

import (
"math/rand"
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)

// GeneratorInterceptor interceptor generates nack feedback messages.
type GeneratorInterceptor struct {
interceptor.NoOp
size uint16
skipLastN uint16
interval time.Duration
receiveLogs *sync.Map
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger

remoteStreamBuf rtp.Packet
}

// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
r := &GeneratorInterceptor{
size: 8192,
skipLastN: 0,
interval: time.Millisecond * 100,
receiveLogs: &sync.Map{},
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
}

for _, opt := range opts {
if err := opt(r); err != nil {
return nil, err
}
}

if _, err := newReceiveLog(r.size); err != nil {
return nil, err
}

return r, nil
}

// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
n.m.Lock()
defer n.m.Unlock()

if n.isClosed() {
return writer
}

n.wg.Add(1)

go n.loop(writer)

return writer
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
if !streamSupportNack(info) {
return reader
}

// error is already checked in NewGeneratorInterceptor
receiveLog, _ := newReceiveLog(n.size)
n.receiveLogs.Store(info.SSRC, receiveLog)

return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(b, a)
if err != nil {
return 0, nil, err
}

if err = n.remoteStreamBuf.Unmarshal(b[:i]); err != nil {
return 0, nil, err
}
receiveLog.add(n.remoteStreamBuf.Header.SequenceNumber)

return i, attr, nil
})
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
n.receiveLogs.Delete(info.SSRC)
}

// Close closes the interceptor
func (n *GeneratorInterceptor) Close() error {
defer n.wg.Wait()
n.m.Lock()
defer n.m.Unlock()

if !n.isClosed() {
close(n.close)
}

return nil
}

func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer n.wg.Done()

senderSSRC := rand.Uint32() // #nosec

ticker := time.NewTicker(n.interval)
for {
select {
case <-ticker.C:
n.receiveLogs.Range(func(key, value interface{}) bool {
ssrc := key.(uint32)
receiveLog := value.(*receiveLog)

missing := receiveLog.missingSeqNumbers(n.skipLastN)
if len(missing) == 0 {
return true
}

nack := &rtcp.TransportLayerNack{
SenderSSRC: senderSSRC,
MediaSSRC: ssrc,
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
}

if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
n.log.Warnf("failed sending nack: %+v", err)
}

return true
})

case <-n.close:
return
}
}
}

func (n *GeneratorInterceptor) isClosed() bool {
select {
case <-n.close:
return true
default:
return false
}
}
70 changes: 70 additions & 0 deletions pkg/nack/generator_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package nack

import (
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestGeneratorInterceptor(t *testing.T) {
const interval = time.Millisecond * 10
i, err := NewGeneratorInterceptor(
GeneratorSize(64),
GeneratorSkipLastN(2),
GeneratorInterval(interval),
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 1,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 16, 18} {
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})

select {
case r := <-stream.ReadRTP():
assert.NoError(t, r.Err)
assert.Equal(t, seqNum, r.Packet.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("receiver rtp packet not found")
}
}

time.Sleep(interval * 2) // wait for at least 2 nack packets

select {
case <-stream.WrittenRTCP():
// ignore the first nack, it might only contain the sequence id 13 as missing
default:
}

select {
case pkts := <-stream.WrittenRTCP():
assert.Equal(t, len(pkts), 1, "single packet RTCP Compound Packet expected")

p, ok := pkts[0].(*rtcp.TransportLayerNack)
assert.True(t, ok, "TransportLayerNack rtcp packet expected, found: %T", pkts[0])

assert.Equal(t, uint16(13), p.Nacks[0].PacketID)
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtcp packet not found")
}
}

func TestGeneratorInterceptor_InvalidSize(t *testing.T) {
_, err := NewGeneratorInterceptor(GeneratorSize(5))
assert.Error(t, err, ErrInvalidSize)
}
44 changes: 44 additions & 0 deletions pkg/nack/generator_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package nack

import (
"time"

"github.com/pion/logging"
)

// GeneratorOption can be used to configure GeneratorInterceptor
type GeneratorOption func(r *GeneratorInterceptor) error

// GeneratorSize sets the size of the interceptor.
// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768
func GeneratorSize(size uint16) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.size = size
return nil
}
}

// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating
// nack requests.
func GeneratorSkipLastN(skipLastN uint16) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.skipLastN = skipLastN
return nil
}
}

// GeneratorLog sets a logger for the interceptor
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.log = log
return nil
}
}

// GeneratorInterval sets the nack send interval for the interceptor
func GeneratorInterval(interval time.Duration) GeneratorOption {
return func(r *GeneratorInterceptor) error {
r.interval = interval
return nil
}
}
14 changes: 14 additions & 0 deletions pkg/nack/nack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Package nack provides interceptors to implement sending and receiving negative acknowledgements
package nack

import "github.com/pion/interceptor"

func streamSupportNack(info *interceptor.StreamInfo) bool {
for _, fb := range info.RTCPFeedback {
if fb.Type == "nack" && fb.Parameter == "" {
return true
}
}

return false
}
Loading