Skip to content

Commit

Permalink
Expose WebRTC peerconn stats (#983)
Browse files Browse the repository at this point in the history
* add WebRTC peer connection transport stats

* save all peer connections in tracker-client and make stats available when seeding

* make offer ID keys into readable strings

* handle unsupported Peer Conn stats on WASM
  • Loading branch information
marcovidonis authored Oct 3, 2024
1 parent 5fe6c49 commit d112dd8
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 10 deletions.
13 changes: 13 additions & 0 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v3"
"golang.org/x/sync/errgroup"

"github.com/anacrolix/torrent/bencode"
Expand Down Expand Up @@ -3002,6 +3003,18 @@ func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
)
}

type webRtcStatsReports map[string]webrtc.StatsReport

func (t *Torrent) GetWebRtcPeerConnStats() map[string]webRtcStatsReports {
stats := make(map[string]webRtcStatsReports)
trackersMap := t.cl.websocketTrackers.clients
for i, trackerClient := range trackersMap {
ts := trackerClient.RtcPeerConnStats()
stats[i] = ts
}
return stats
}

type requestState struct {
peer *Peer
when time.Time
Expand Down
15 changes: 15 additions & 0 deletions webtorrent/peer-conn-stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//go:build !js
// +build !js

package webtorrent

import (
"github.com/pion/webrtc/v3"
)

func GetPeerConnStats(pc *wrappedPeerConnection) (stats webrtc.StatsReport) {
if pc != nil {
stats = pc.GetStats()
}
return
}
13 changes: 13 additions & 0 deletions webtorrent/peer-conn-stats_js.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//go:build js && wasm
// +build js,wasm

package webtorrent

import (
"github.com/pion/webrtc/v3"
)

// webrtc.PeerConnection.GetStats() is not currently supported for WASM. Return empty stats.
func GetPeerConnStats(pc *wrappedPeerConnection) (stats webrtc.StatsReport) {
return
}
53 changes: 43 additions & 10 deletions webtorrent/tracker-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type TrackerClient struct {

WebsocketTrackerHttpHeader func() http.Header
ICEServers []webrtc.ICEServer

rtcPeerConns map[string]*wrappedPeerConnection
}

func (me *TrackerClient) Stats() TrackerClientStats {
Expand Down Expand Up @@ -234,17 +236,22 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
return fmt.Errorf("creating offer: %w", err)
}

// save the leecher peer connections
tc.storePeerConnection(fmt.Sprintf("%x", randOfferId[:]), pc)

pc.OnClose(func() {
delete(tc.rtcPeerConns, offerIDBinary)
})

tc.Logger.Levelf(log.Debug, "announcing offer")
err = tc.announce(event, infoHash, []outboundOffer{
{
offerId: offerIDBinary,
outboundOfferValue: outboundOfferValue{
originalOffer: offer,
peerConnection: pc,
infoHash: infoHash,
dataChannel: dc,
},
},
err = tc.announce(event, infoHash, []outboundOffer{{
offerId: offerIDBinary,
outboundOfferValue: outboundOfferValue{
originalOffer: offer,
peerConnection: pc,
infoHash: infoHash,
dataChannel: dc,
}},
})
if err != nil {
dc.Close()
Expand Down Expand Up @@ -293,6 +300,19 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte
return nil
}

// Calculate the stats for all the peer connections the moment they are requested.
// As the stats will change over the life of a peer connection, this ensures that
// the updated values are returned.
func (tc *TrackerClient) RtcPeerConnStats() map[string]webrtc.StatsReport {
tc.mu.Lock()
defer tc.mu.Unlock()
sr := make(map[string]webrtc.StatsReport)
for id, pc := range tc.rtcPeerConns {
sr[id] = GetPeerConnStats(pc)
}
return sr
}

func (tc *TrackerClient) writeMessage(data []byte) error {
for tc.wsConn == nil {
if tc.closed {
Expand Down Expand Up @@ -359,6 +379,10 @@ func (tc *TrackerClient) handleOffer(
if err != nil {
return fmt.Errorf("creating answering peer connection: %w", err)
}

// save the seeder peer connections
tc.storePeerConnection(fmt.Sprintf("%x", offerContext.Id[:]), peerConnection)

response := AnnounceResponse{
Action: "announce",
InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
Expand Down Expand Up @@ -401,3 +425,12 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
delete(tc.outboundOffers, offerId)
go tc.Announce(tracker.None, offer.infoHash)
}

func (tc *TrackerClient) storePeerConnection(offerId string, pc *wrappedPeerConnection) {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.rtcPeerConns == nil {
tc.rtcPeerConns = make(map[string]*wrappedPeerConnection)
}
tc.rtcPeerConns[offerId] = pc
}
19 changes: 19 additions & 0 deletions webtorrent/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,35 @@ type wrappedPeerConnection struct {
pproffd.CloseWrapper
span trace.Span
ctx context.Context

onCloseHandler func()
}

func (me *wrappedPeerConnection) Close() error {
me.closeMu.Lock()
defer me.closeMu.Unlock()

me.onClose()

err := me.CloseWrapper.Close()
me.span.End()
return err
}

func (me *wrappedPeerConnection) OnClose(f func()) {
me.closeMu.Lock()
defer me.closeMu.Unlock()
me.onCloseHandler = f
}

func (me *wrappedPeerConnection) onClose() {
handler := me.onCloseHandler

if handler != nil {
handler()
}
}

func newPeerConnection(logger log.Logger, iceServers []webrtc.ICEServer) (*wrappedPeerConnection, error) {
newPeerConnectionMu.Lock()
defer newPeerConnectionMu.Unlock()
Expand Down

0 comments on commit d112dd8

Please sign in to comment.