Skip to content

Commit

Permalink
Only apply WebRTC data channel write limit to webtorrent peer conns
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Oct 17, 2024
1 parent 05ab0ca commit edb5b09
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 34 deletions.
3 changes: 1 addition & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
gbtree "github.com/google/btree"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v4"

"github.com/anacrolix/torrent/bencode"
Expand Down Expand Up @@ -333,7 +332,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
ICEServers: cl.ICEServers(),
DialContext: cl.config.TrackerDialContext,
OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
cl.lock()
defer cl.unlock()
t, ok := cl.torrentsByShortHash[dcc.InfoHash]
Expand Down
4 changes: 2 additions & 2 deletions peer-conn-msg-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
}
var err error
for frontBuf.Len() != 0 {
// Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
next := frontBuf.Next(1<<16 - 1)
next := frontBuf.Bytes()
var n int
n, err = cn.w.Write(next)
frontBuf.Next(n)
if err == nil && n != len(next) {
panic("expected full write")
}
Expand Down
3 changes: 1 addition & 2 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/anacrolix/missinggo/v2/pubsub"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v4"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -1851,7 +1850,7 @@ func (t *Torrent) seeding() bool {
}

func (t *Torrent) onWebRtcConn(
c datachannel.ReadWriteCloser,
c webtorrent.DataChannelConn,
dcc webtorrent.DataChannelContext,
) {
defer c.Close()
Expand Down
4 changes: 2 additions & 2 deletions webrtc.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package torrent

import (
"io"
"net"
"strconv"
"time"

"github.com/pion/datachannel"
"github.com/pion/webrtc/v4"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -17,7 +17,7 @@ import (
const webrtcNetwork = "webrtc"

type webrtcNetConn struct {
datachannel.ReadWriteCloser
io.ReadWriteCloser
webtorrent.DataChannelContext
}

Expand Down
3 changes: 1 addition & 2 deletions webtorrent/tracker-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/gorilla/websocket"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v4"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -85,7 +84,7 @@ func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidat
return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
}

type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
type onDataChannelOpen func(_ DataChannelConn, dcc DataChannelContext)

func (tc *TrackerClient) doWebsocket() error {
metrics.Add("websocket dials", 1)
Expand Down
79 changes: 57 additions & 22 deletions webtorrent/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"expvar"
"fmt"
"io"
"os"
"strconv"
"sync"
"time"

g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/panicif"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v4"
Expand Down Expand Up @@ -132,7 +135,7 @@ func (tc *TrackerClient) newOffer(
err = fmt.Errorf("creating data channel: %w", err)
peerConnection.Close()
}
initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
initDataChannel(dataChannel, peerConnection, func(dc DataChannelConn, dcCtx context.Context, dcSpan trace.Span) {
metrics.Add("outbound offers answered with datachannel", 1)
tc.mu.Lock()
tc.stats.ConvertedOutboundConns++
Expand Down Expand Up @@ -162,7 +165,7 @@ func (tc *TrackerClient) newOffer(
return
}

type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
type onDetachedDataChannelFunc func(detached DataChannelConn, ctx context.Context, span trace.Span)

func (tc *TrackerClient) initAnsweringPeerConnection(
peerConn *wrappedPeerConnection,
Expand All @@ -176,7 +179,7 @@ func (tc *TrackerClient) initAnsweringPeerConnection(
peerConn.Close()
})
peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
initDataChannel(d, peerConn, func(detached DataChannelConn, ctx context.Context, span trace.Span) {
timer.Stop()
metrics.Add("answering peer connection conversions", 1)
tc.mu.Lock()
Expand Down Expand Up @@ -225,13 +228,6 @@ func (tc *TrackerClient) newAnsweringPeerConnection(
return
}

type datachannelReadWriter interface {
datachannel.Reader
datachannel.Writer
io.Reader
io.Writer
}

type ioCloserFunc func() error

func (me ioCloserFunc) Close() error {
Expand All @@ -256,29 +252,68 @@ func initDataChannel(
// This shouldn't happen if the API is configured correctly, and we call from OnOpen.
panic(err)
}
onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
onOpen(wrapDataChannel(raw, pc, span, dc), ctx, span)
})
}

// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
// and responsibility for the PeerConnection.
func hookDataChannelCloser(
// WebRTC data channel wrapper that supports operating as a peer conn ReadWriteCloser.
type DataChannelConn struct {
ioCloserFunc
rawDataChannel datachannel.ReadWriteCloser
}

func (d DataChannelConn) Read(p []byte) (int, error) {
return d.rawDataChannel.Read(p)
}

// Limit write size for WebRTC data channels. See https://github.com/pion/datachannel/issues/59. The
// default used to be (1<<16)-1. This will be set to the new appropriate value if it's discovered to
// still be a limitation. Set WEBTORRENT_MAX_WRITE_SIZE to experiment with it.
var maxWriteSize = g.None[int]()

func init() {
s, ok := os.LookupEnv("WEBTORRENT_MAX_WRITE_SIZE")
if !ok {
return
}
i64, err := strconv.ParseInt(s, 0, 0)
panicif.Err(err)
maxWriteSize = g.Some(int(i64))
}

func (d DataChannelConn) Write(p []byte) (n int, err error) {
for {
p1 := p
if maxWriteSize.Ok {
p1 = p1[:min(len(p1), maxWriteSize.Value)]
}
var n1 int
n1, err = d.rawDataChannel.Write(p1)
n += n1
p = p[n1:]
if err != nil {
return
}
if len(p) == 0 {
return
}
}
}

func wrapDataChannel(
dcrwc datachannel.ReadWriteCloser,
pc *wrappedPeerConnection,
dataChannelSpan trace.Span,
originalDataChannel *webrtc.DataChannel,
) datachannel.ReadWriteCloser {
return struct {
datachannelReadWriter
io.Closer
}{
dcrwc,
ioCloserFunc(func() error {
) DataChannelConn {
return DataChannelConn{
ioCloserFunc: ioCloserFunc(func() error {
dcrwc.Close()
pc.Close()
originalDataChannel.Close()
dataChannelSpan.End()
return nil
}),
rawDataChannel: dcrwc,
}
}
3 changes: 1 addition & 2 deletions wstracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/anacrolix/log"
"github.com/gorilla/websocket"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v4"

"github.com/anacrolix/torrent/tracker"
Expand Down Expand Up @@ -43,7 +42,7 @@ type websocketTrackers struct {
PeerId [20]byte
Logger log.Logger
GetAnnounceRequest func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
OnConn func(datachannel.ReadWriteCloser, webtorrent.DataChannelContext)
OnConn func(webtorrent.DataChannelConn, webtorrent.DataChannelContext)
mu sync.Mutex
clients map[string]*refCountedWebtorrentTrackerClient
Proxy httpTracker.ProxyFunc
Expand Down

0 comments on commit edb5b09

Please sign in to comment.