From 07afb0fd5597104a880f0e0a38817625b5d272c4 Mon Sep 17 00:00:00 2001 From: Adam Kiss Date: Fri, 27 Nov 2020 17:49:02 +0100 Subject: [PATCH] Add Nack Interceptors * ResponderInterceptor which responds to NACK Requests * GeneratorInterceptor which generates NACK Requests --- go.mod | 1 + go.sum | 2 + nack.go | 12 -- pkg/nack/errors.go | 6 + pkg/nack/generator_interceptor.go | 158 +++++++++++++++++++++++++ pkg/nack/generator_interceptor_test.go | 70 +++++++++++ pkg/nack/generator_option.go | 44 +++++++ pkg/nack/nack.go | 14 +++ pkg/nack/receive_log.go | 134 +++++++++++++++++++++ pkg/nack/receive_log_test.go | 137 +++++++++++++++++++++ pkg/nack/responder_interceptor.go | 114 ++++++++++++++++++ pkg/nack/responder_interceptor_test.go | 71 +++++++++++ pkg/nack/responder_option.go | 23 ++++ pkg/nack/send_buffer.go | 74 ++++++++++++ pkg/nack/send_buffer_test.go | 64 ++++++++++ 15 files changed, 912 insertions(+), 12 deletions(-) delete mode 100644 nack.go create mode 100644 pkg/nack/errors.go create mode 100644 pkg/nack/generator_interceptor.go create mode 100644 pkg/nack/generator_interceptor_test.go create mode 100644 pkg/nack/generator_option.go create mode 100644 pkg/nack/nack.go create mode 100644 pkg/nack/receive_log.go create mode 100644 pkg/nack/receive_log_test.go create mode 100644 pkg/nack/responder_interceptor.go create mode 100644 pkg/nack/responder_interceptor_test.go create mode 100644 pkg/nack/responder_option.go create mode 100644 pkg/nack/send_buffer.go create mode 100644 pkg/nack/send_buffer_test.go diff --git a/go.mod b/go.mod index a1e57d62..067c0d43 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/pion/interceptor go 1.15 require ( + github.com/pion/logging v0.2.2 github.com/pion/rtcp v1.2.6 github.com/pion/rtp v1.6.2 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 33c52ce8..d927bde0 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo= diff --git a/nack.go b/nack.go deleted file mode 100644 index 736f562b..00000000 --- a/nack.go +++ /dev/null @@ -1,12 +0,0 @@ -package interceptor - -// NACK interceptor generates/responds to nack messages. -type NACK struct { - NoOp -} - -// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method -// will be called once per rtp packet. -func (n *NACK) BindRemoteStream(_ *StreamInfo, reader RTPReader) RTPReader { - return reader -} diff --git a/pkg/nack/errors.go b/pkg/nack/errors.go new file mode 100644 index 00000000..bbfc7736 --- /dev/null +++ b/pkg/nack/errors.go @@ -0,0 +1,6 @@ +package nack + +import "errors" + +// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied. +var ErrInvalidSize = errors.New("invalid buffer size") diff --git a/pkg/nack/generator_interceptor.go b/pkg/nack/generator_interceptor.go new file mode 100644 index 00000000..b02fa903 --- /dev/null +++ b/pkg/nack/generator_interceptor.go @@ -0,0 +1,158 @@ +package nack + +import ( + "math/rand" + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// GeneratorInterceptor interceptor generates nack feedback messages. +type GeneratorInterceptor struct { + interceptor.NoOp + size uint16 + skipLastN uint16 + interval time.Duration + receiveLogs *sync.Map + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger + + remoteStreamBuf rtp.Packet +} + +// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor +func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) { + r := &GeneratorInterceptor{ + size: 8192, + skipLastN: 0, + interval: time.Millisecond * 100, + receiveLogs: &sync.Map{}, + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"), + } + + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err + } + } + + if _, err := newReceiveLog(r.size); err != nil { + return nil, err + } + + return r, nil +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + n.m.Lock() + defer n.m.Unlock() + + if n.isClosed() { + return writer + } + + n.wg.Add(1) + + go n.loop(writer) + + return writer +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + if !streamSupportNack(info) { + return reader + } + + // error is already checked in NewGeneratorInterceptor + receiveLog, _ := newReceiveLog(n.size) + n.receiveLogs.Store(info.SSRC, receiveLog) + + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := reader.Read(b, a) + if err != nil { + return 0, nil, err + } + + if err = n.remoteStreamBuf.Unmarshal(b[:i]); err != nil { + return 0, nil, err + } + receiveLog.add(n.remoteStreamBuf.Header.SequenceNumber) + + return i, attr, nil + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + n.receiveLogs.Delete(info.SSRC) +} + +// Close closes the interceptor +func (n *GeneratorInterceptor) Close() error { + defer n.wg.Wait() + n.m.Lock() + defer n.m.Unlock() + + if !n.isClosed() { + close(n.close) + } + + return nil +} + +func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { + defer n.wg.Done() + + senderSSRC := rand.Uint32() // #nosec + + ticker := time.NewTicker(n.interval) + for { + select { + case <-ticker.C: + n.receiveLogs.Range(func(key, value interface{}) bool { + ssrc := key.(uint32) + receiveLog := value.(*receiveLog) + + missing := receiveLog.missingSeqNumbers(n.skipLastN) + if len(missing) == 0 { + return true + } + + nack := &rtcp.TransportLayerNack{ + SenderSSRC: senderSSRC, + MediaSSRC: ssrc, + Nacks: rtcp.NackPairsFromSequenceNumbers(missing), + } + + if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil { + n.log.Warnf("failed sending nack: %+v", err) + } + + return true + }) + + case <-n.close: + return + } + } +} + +func (n *GeneratorInterceptor) isClosed() bool { + select { + case <-n.close: + return true + default: + return false + } +} diff --git a/pkg/nack/generator_interceptor_test.go b/pkg/nack/generator_interceptor_test.go new file mode 100644 index 00000000..481a42c6 --- /dev/null +++ b/pkg/nack/generator_interceptor_test.go @@ -0,0 +1,70 @@ +package nack + +import ( + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestGeneratorInterceptor(t *testing.T) { + const interval = time.Millisecond * 10 + i, err := NewGeneratorInterceptor( + GeneratorSize(64), + GeneratorSkipLastN(2), + GeneratorInterval(interval), + GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 1, + RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}}, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for _, seqNum := range []uint16{10, 11, 12, 14, 16, 18} { + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}) + + select { + case r := <-stream.ReadRTP(): + assert.NoError(t, r.Err) + assert.Equal(t, seqNum, r.Packet.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("receiver rtp packet not found") + } + } + + time.Sleep(interval * 2) // wait for at least 2 nack packets + + select { + case <-stream.WrittenRTCP(): + // ignore the first nack, it might only contain the sequence id 13 as missing + default: + } + + select { + case pkts := <-stream.WrittenRTCP(): + assert.Equal(t, len(pkts), 1, "single packet RTCP Compound Packet expected") + + p, ok := pkts[0].(*rtcp.TransportLayerNack) + assert.True(t, ok, "TransportLayerNack rtcp packet expected, found: %T", pkts[0]) + + assert.Equal(t, uint16(13), p.Nacks[0].PacketID) + assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtcp packet not found") + } +} + +func TestGeneratorInterceptor_InvalidSize(t *testing.T) { + _, err := NewGeneratorInterceptor(GeneratorSize(5)) + assert.Error(t, err, ErrInvalidSize) +} diff --git a/pkg/nack/generator_option.go b/pkg/nack/generator_option.go new file mode 100644 index 00000000..092f5db9 --- /dev/null +++ b/pkg/nack/generator_option.go @@ -0,0 +1,44 @@ +package nack + +import ( + "time" + + "github.com/pion/logging" +) + +// GeneratorOption can be used to configure GeneratorInterceptor +type GeneratorOption func(r *GeneratorInterceptor) error + +// GeneratorSize sets the size of the interceptor. +// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768 +func GeneratorSize(size uint16) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.size = size + return nil + } +} + +// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating +// nack requests. +func GeneratorSkipLastN(skipLastN uint16) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.skipLastN = skipLastN + return nil + } +} + +// GeneratorLog sets a logger for the interceptor +func GeneratorLog(log logging.LeveledLogger) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.log = log + return nil + } +} + +// GeneratorInterval sets the nack send interval for the interceptor +func GeneratorInterval(interval time.Duration) GeneratorOption { + return func(r *GeneratorInterceptor) error { + r.interval = interval + return nil + } +} diff --git a/pkg/nack/nack.go b/pkg/nack/nack.go new file mode 100644 index 00000000..a658e7f3 --- /dev/null +++ b/pkg/nack/nack.go @@ -0,0 +1,14 @@ +// Package nack provides interceptors to implement sending and receiving negative acknowledgements +package nack + +import "github.com/pion/interceptor" + +func streamSupportNack(info *interceptor.StreamInfo) bool { + for _, fb := range info.RTCPFeedback { + if fb.Type == "nack" && fb.Parameter == "" { + return true + } + } + + return false +} diff --git a/pkg/nack/receive_log.go b/pkg/nack/receive_log.go new file mode 100644 index 00000000..8107f59a --- /dev/null +++ b/pkg/nack/receive_log.go @@ -0,0 +1,134 @@ +package nack + +import ( + "fmt" + "sync" +) + +type receiveLog struct { + packets []uint64 + size uint16 + end uint16 + started bool + lastConsecutive uint16 + m sync.RWMutex +} + +func newReceiveLog(size uint16) (*receiveLog, error) { + allowedSizes := make([]uint16, 0) + correctSize := false + for i := 6; i < 16; i++ { + if size == 1< end (with counting for rollovers) + for i := s.end + 1; i != seq; i++ { + // clear packets between end and seq (these may contain packets from a "size" ago) + s.delReceived(i) + } + s.end = seq + + if s.lastConsecutive+1 == seq { + s.lastConsecutive = seq + } else if seq-s.lastConsecutive > s.size { + s.lastConsecutive = seq - s.size + s.fixLastConsecutive() // there might be valid packets at the beginning of the buffer now + } + case s.lastConsecutive+1 == seq: + // negative diff, seq < end (with counting for rollovers) + s.lastConsecutive = seq + s.fixLastConsecutive() // there might be other valid packets after seq + } + + s.setReceived(seq) +} + +func (s *receiveLog) get(seq uint16) bool { + s.m.RLock() + defer s.m.RUnlock() + + diff := s.end - seq + if diff >= uint16SizeHalf { + return false + } + + if diff >= s.size { + return false + } + + return s.getReceived(seq) +} + +func (s *receiveLog) missingSeqNumbers(skipLastN uint16) []uint16 { + s.m.RLock() + defer s.m.RUnlock() + + until := s.end - skipLastN + if until-s.lastConsecutive >= uint16SizeHalf { + // until < s.lastConsecutive (counting for rollover) + return nil + } + + missingPacketSeqNums := make([]uint16, 0) + for i := s.lastConsecutive + 1; i != until+1; i++ { + if !s.getReceived(i) { + missingPacketSeqNums = append(missingPacketSeqNums, i) + } + } + + return missingPacketSeqNums +} + +func (s *receiveLog) setReceived(seq uint16) { + pos := seq % s.size + s.packets[pos/64] |= 1 << (pos % 64) +} + +func (s *receiveLog) delReceived(seq uint16) { + pos := seq % s.size + s.packets[pos/64] &^= 1 << (pos % 64) +} + +func (s *receiveLog) getReceived(seq uint16) bool { + pos := seq % s.size + return (s.packets[pos/64] & (1 << (pos % 64))) != 0 +} + +func (s *receiveLog) fixLastConsecutive() { + i := s.lastConsecutive + 1 + for ; i != s.end+1 && s.getReceived(i); i++ { + // find all consecutive packets + } + s.lastConsecutive = i - 1 +} diff --git a/pkg/nack/receive_log_test.go b/pkg/nack/receive_log_test.go new file mode 100644 index 00000000..75e5e274 --- /dev/null +++ b/pkg/nack/receive_log_test.go @@ -0,0 +1,137 @@ +package nack + +import ( + "fmt" + "reflect" + "testing" +) + +func TestReceivedBuffer(t *testing.T) { + for _, start := range []uint16{0, 1, 127, 128, 129, 511, 512, 513, 32767, 32768, 32769, 65407, 65408, 65409, 65534, 65535} { + start := start + + t.Run(fmt.Sprintf("StartFrom%d", start), func(t *testing.T) { + rl, err := newReceiveLog(128) + if err != nil { + t.Fatalf("%+v", err) + } + + all := func(min uint16, max uint16) []uint16 { + result := make([]uint16, 0) + for i := min; i != max+1; i++ { + result = append(result, i) + } + return result + } + join := func(parts ...[]uint16) []uint16 { + result := make([]uint16, 0) + for _, p := range parts { + result = append(result, p...) + } + return result + } + + add := func(nums ...uint16) { + for _, n := range nums { + seq := start + n + rl.add(seq) + } + } + + assertGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + if !rl.get(seq) { + t.Errorf("not found: %d", seq) + } + } + } + assertNOTGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + if rl.get(seq) { + t.Errorf("packet found for %d", seq) + } + } + } + assertMissing := func(skipLastN uint16, nums []uint16) { + t.Helper() + missing := rl.missingSeqNumbers(skipLastN) + if missing == nil { + missing = []uint16{} + } + want := make([]uint16, 0, len(nums)) + for _, n := range nums { + want = append(want, start+n) + } + if !reflect.DeepEqual(want, missing) { + t.Errorf("missing want/got %v / %v", want, missing) + } + } + assertLastConsecutive := func(lastConsecutive uint16) { + want := lastConsecutive + start + if rl.lastConsecutive != want { + t.Errorf("invalid lastConsecutive want %d got %d", want, rl.lastConsecutive) + } + } + + add(0) + assertGet(0) + assertMissing(0, []uint16{}) + assertLastConsecutive(0) // first element added + + add(all(1, 127)...) + assertGet(all(1, 127)...) + assertMissing(0, []uint16{}) + assertLastConsecutive(127) + + add(128) + assertGet(128) + assertNOTGet(0) + assertMissing(0, []uint16{}) + assertLastConsecutive(128) + + add(130) + assertGet(130) + assertNOTGet(1, 2, 129) + assertMissing(0, []uint16{129}) + assertLastConsecutive(128) + + add(333) + assertGet(333) + assertNOTGet(all(0, 332)...) + assertMissing(0, all(206, 332)) // all 127 elements missing before 333 + assertMissing(10, all(206, 323)) // skip last 10 packets (324-333) from check + assertLastConsecutive(205) // lastConsecutive is still out of the buffer + + add(329) + assertGet(329) + assertMissing(0, join(all(206, 328), all(330, 332))) + assertMissing(5, join(all(206, 328))) // skip last 5 packets (329-333) from check + assertLastConsecutive(205) + + add(all(207, 320)...) + assertGet(all(207, 320)...) + assertMissing(0, join([]uint16{206}, all(321, 328), all(330, 332))) + assertLastConsecutive(205) + + add(334) + assertGet(334) + assertNOTGet(206) + assertMissing(0, join(all(321, 328), all(330, 332))) + assertLastConsecutive(320) // head of buffer is full of consecutive packages + + add(all(322, 328)...) + assertGet(all(322, 328)...) + assertMissing(0, join([]uint16{321}, all(330, 332))) + assertLastConsecutive(320) + + add(321) + assertGet(321) + assertMissing(0, all(330, 332)) + assertLastConsecutive(329) // after adding a single missing packet, lastConsecutive should jump forward + }) + } +} diff --git a/pkg/nack/responder_interceptor.go b/pkg/nack/responder_interceptor.go new file mode 100644 index 00000000..d054a956 --- /dev/null +++ b/pkg/nack/responder_interceptor.go @@ -0,0 +1,114 @@ +package nack + +import ( + "sync" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// ResponderInterceptor responds to nack feedback messages +type ResponderInterceptor struct { + interceptor.NoOp + size uint16 + streams *sync.Map + log logging.LeveledLogger +} + +type localStream struct { + sendBuffer *sendBuffer + rtpWriter interceptor.RTPWriter +} + +// NewResponderInterceptor returns a new GeneratorInterceptor interceptor +func NewResponderInterceptor(opts ...ResponderOption) (*ResponderInterceptor, error) { + r := &ResponderInterceptor{ + size: 8192, + streams: &sync.Map{}, + log: logging.NewDefaultLoggerFactory().NewLogger("nack_responder"), + } + + for _, opt := range opts { + if err := opt(r); err != nil { + return nil, err + } + } + + if _, err := newSendBuffer(r.size); err != nil { + return nil, err + } + + return r, nil +} + +// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might +// change in the future. The returned method will be called once per packet batch. +func (n *ResponderInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + i, attr, err := reader.Read(b, a) + if err != nil { + return 0, nil, err + } + + pkts, err := rtcp.Unmarshal(b[:i]) + if err != nil { + return 0, nil, err + } + for _, rtcpPacket := range pkts { + nack, ok := rtcpPacket.(*rtcp.TransportLayerNack) + if !ok { + continue + } + + go n.resendPackets(nack) + } + + return i, attr, err + }) +} + +// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method +// will be called once per rtp packet. +func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + if !streamSupportNack(info) { + return writer + } + + // error is already checked in NewGeneratorInterceptor + sendBuffer, _ := newSendBuffer(n.size) + n.streams.Store(info.SSRC, &localStream{sendBuffer: sendBuffer, rtpWriter: writer}) + + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + sendBuffer.add(&rtp.Packet{Header: *header, Payload: payload}) + + return writer.Write(header, payload, attributes) + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *ResponderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + n.streams.Delete(info.SSRC) +} + +func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) { + v, ok := n.streams.Load(nack.MediaSSRC) + if !ok { + return + } + + stream := v.(*localStream) + + for i := range nack.Nacks { + nack.Nacks[i].Range(func(seq uint16) bool { + if p := stream.sendBuffer.get(seq); p != nil { + if _, err := stream.rtpWriter.Write(&p.Header, p.Payload, interceptor.Attributes{}); err != nil { + n.log.Warnf("failed resending nacked packet: %+v", err) + } + } + + return true + }) + } +} diff --git a/pkg/nack/responder_interceptor_test.go b/pkg/nack/responder_interceptor_test.go new file mode 100644 index 00000000..5ff322d3 --- /dev/null +++ b/pkg/nack/responder_interceptor_test.go @@ -0,0 +1,71 @@ +package nack + +import ( + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestResponderInterceptor(t *testing.T) { + i, err := NewResponderInterceptor( + ResponderSize(8), + ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 1, + RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}}, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for _, seqNum := range []uint16{10, 11, 12, 14, 15} { + assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})) + + select { + case p := <-stream.WrittenRTP(): + assert.Equal(t, seqNum, p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + } + + stream.ReceiveRTCP([]rtcp.Packet{ + &rtcp.TransportLayerNack{ + MediaSSRC: 1, + SenderSSRC: 2, + Nacks: []rtcp.NackPair{ + {PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15 + }, + }, + }) + + // seq number 13 was never sent, so it can't be resent + for _, seqNum := range []uint16{11, 12, 15} { + select { + case p := <-stream.WrittenRTP(): + assert.Equal(t, seqNum, p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + } + + select { + case p := <-stream.WrittenRTP(): + t.Errorf("no more rtp packets expected, found sequence number: %v", p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + } +} + +func TestResponderInterceptor_InvalidSize(t *testing.T) { + _, err := NewResponderInterceptor(ResponderSize(5)) + assert.Error(t, err, ErrInvalidSize) +} diff --git a/pkg/nack/responder_option.go b/pkg/nack/responder_option.go new file mode 100644 index 00000000..7ad52c8a --- /dev/null +++ b/pkg/nack/responder_option.go @@ -0,0 +1,23 @@ +package nack + +import "github.com/pion/logging" + +// ResponderOption can be used to configure ResponderInterceptor +type ResponderOption func(s *ResponderInterceptor) error + +// ResponderSize sets the size of the interceptor. +// Size must be one of: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768 +func ResponderSize(size uint16) ResponderOption { + return func(r *ResponderInterceptor) error { + r.size = size + return nil + } +} + +// ResponderLog sets a logger for the interceptor +func ResponderLog(log logging.LeveledLogger) ResponderOption { + return func(r *ResponderInterceptor) error { + r.log = log + return nil + } +} diff --git a/pkg/nack/send_buffer.go b/pkg/nack/send_buffer.go new file mode 100644 index 00000000..cf3f020e --- /dev/null +++ b/pkg/nack/send_buffer.go @@ -0,0 +1,74 @@ +package nack + +import ( + "fmt" + + "github.com/pion/rtp" +) + +const ( + uint16SizeHalf = 1 << 15 +) + +type sendBuffer struct { + packets []*rtp.Packet + size uint16 + lastAdded uint16 + started bool +} + +func newSendBuffer(size uint16) (*sendBuffer, error) { + allowedSizes := make([]uint16, 0) + correctSize := false + for i := 0; i < 16; i++ { + if size == 1<= uint16SizeHalf { + return nil + } + + if diff >= s.size { + return nil + } + + return s.packets[seq%s.size] +} diff --git a/pkg/nack/send_buffer_test.go b/pkg/nack/send_buffer_test.go new file mode 100644 index 00000000..afba2169 --- /dev/null +++ b/pkg/nack/send_buffer_test.go @@ -0,0 +1,64 @@ +package nack + +import ( + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestSendBuffer(t *testing.T) { + for _, start := range []uint16{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 511, 512, 513, 32767, 32768, 32769, 65527, 65528, 65529, 65530, 65531, 65532, 65533, 65534, 65535} { + start := start + + sb, err := newSendBuffer(8) + assert.NoError(t, err) + + add := func(nums ...uint16) { + for _, n := range nums { + seq := start + n + sb.add(&rtp.Packet{Header: rtp.Header{SequenceNumber: seq}}) + } + } + + assertGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + packet := sb.get(seq) + if packet == nil { + t.Errorf("packet not found: %d", seq) + continue + } + if packet.SequenceNumber != seq { + t.Errorf("packet for %d returned with incorrect SequenceNumber: %d", seq, packet.SequenceNumber) + } + } + } + assertNOTGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + packet := sb.get(seq) + if packet != nil { + t.Errorf("packet found for %d: %d", seq, packet.SequenceNumber) + } + } + } + + add(0, 1, 2, 3, 4, 5, 6, 7) + assertGet(0, 1, 2, 3, 4, 5, 6, 7) + + add(8) + assertGet(8) + assertNOTGet(0) + + add(10) + assertGet(10) + assertNOTGet(1, 2, 9) + + add(22) + assertGet(22) + assertNOTGet(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21) + } +}