Skip to content

Commit

Permalink
Tidy up cmd/torrent cleanup and websocket trackers logging
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Aug 13, 2024
1 parent 2502dd2 commit f471182
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 25 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {

cl.websocketTrackers = websocketTrackers{
PeerId: cl.peerID,
Logger: cl.logger,
Logger: cl.logger.WithNames("websocketTrackers"),
GetAnnounceRequest: func(
event tracker.AnnounceEvent, infoHash [20]byte,
) (
Expand Down
2 changes: 1 addition & 1 deletion cmd/torrent/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func downloadErr(flags downloadFlags) error {
<-ctx.Done()
}
}
spew.Dump(expvar.Get("torrent").(*expvar.Map).Get("chunks received"))
fmt.Printf("chunks received: %v\n", &torrent.ChunksReceived)
spew.Dump(client.ConnStats())
clStats := client.ConnStats()
sentOverhead := clStats.BytesWritten.Int64() - clStats.BytesWrittenData.Int64()
Expand Down
11 changes: 9 additions & 2 deletions cmd/torrent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
app "github.com/anacrolix/gostdapp"
"io"
stdLog "log"
"net/http"
Expand Down Expand Up @@ -39,12 +40,15 @@ func shutdownTracerProvider(ctx context.Context, tp *trace.TracerProvider) {
}

func main() {
app.RunContext(mainErr)
}

func mainErr(ctx context.Context) error {
defer stdLog.SetFlags(stdLog.Flags() | stdLog.Lshortfile)

ctx := context.Background()
tracingExporter, err := otlptracegrpc.New(ctx)
if err != nil {
log.Fatalf("creating tracing exporter: %v", err)
return fmt.Errorf("creating tracing exporter: %w", err)
}
tracerProvider := trace.NewTracerProvider(trace.WithBatcher(tracingExporter))
defer shutdownTracerProvider(ctx, tracerProvider)
Expand Down Expand Up @@ -148,5 +152,8 @@ func main() {
bargle.Subcommand{Name: "serve", Command: serve()},
bargle.Subcommand{Name: "create", Command: create()},
)
// Well this sux, this old version of bargle doesn't return so we can let the gostdapp Context
// clean up.
main.Run()
return nil
}
6 changes: 4 additions & 2 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ func defaultPeerExtensionBytes() PeerExtensionBits {

func init() {
torrent.Set("peers supporting extension", &peersSupportingExtension)
torrent.Set("chunks received", &chunksReceived)
torrent.Set("chunks received", &ChunksReceived)
}

// I could move a lot of these counters to their own file, but I suspect they
// may be attached to a Client someday.
var (
torrent = expvar.NewMap("torrent")
peersSupportingExtension expvar.Map
chunksReceived expvar.Map
// This could move at any time. It contains counts of chunks received and the conditions they
// were received.
ChunksReceived expvar.Map

pieceHashedCorrect = expvar.NewInt("pieceHashedCorrect")
pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ github.com/alexflint/go-scalar v1.1.0 h1:aaAouLLzI9TChcPXotr6gUhq+Scr8rl0P9P4Pnl
github.com/alexflint/go-scalar v1.1.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o=
github.com/anacrolix/backtrace v0.0.0-20221205112523-22a61db8f82e h1:A0Ty9UeyBDIo29ZMnk0AvPqWDIa4HVvCaJqWNlCrMXA=
github.com/anacrolix/backtrace v0.0.0-20221205112523-22a61db8f82e/go.mod h1:4YFqy+788tLJWtin2jNliYVJi+8aDejG9zcu/2/pONw=
github.com/anacrolix/bargle v0.0.0-20220630015206-d7a4d433886a h1:KCP9QvHlLoUQBOaTf/YCuOzG91Ym1cPB6S68O4Q3puo=
github.com/anacrolix/bargle v0.0.0-20220630015206-d7a4d433886a/go.mod h1:9xUiZbkh+94FbiIAL1HXpAIBa832f3Mp07rRPl5c5RQ=
github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d h1:ypNOsIwvdumNRlqWj/hsnLs5TyQWQOylwi+T9Qs454A=
github.com/anacrolix/bargle v0.0.0-20221014000746-4f2739072e9d/go.mod h1:9xUiZbkh+94FbiIAL1HXpAIBa832f3Mp07rRPl5c5RQ=
github.com/anacrolix/chansync v0.4.1-0.20240627045151-1aa1ac392fe8 h1:eyb0bBaQKMOh5Se/Qg54shijc8K4zpQiOjEhKFADkQM=
Expand Down
12 changes: 6 additions & 6 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (c *Peer) doChunkReadStats(size int64) {

// Handle a received chunk from a peer.
func (c *Peer) receiveChunk(msg *pp.Message) error {
chunksReceived.Add("total", 1)
ChunksReceived.Add("total", 1)

ppReq := newRequestFromMessage(msg)
t := c.t
Expand All @@ -628,17 +628,17 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
defer recordBlockForSmartBan()

if c.peerChoking {
chunksReceived.Add("while choked", 1)
ChunksReceived.Add("while choked", 1)
}

if c.validReceiveChunks[req] <= 0 {
chunksReceived.Add("unexpected", 1)
ChunksReceived.Add("unexpected", 1)
return errors.New("received unexpected chunk")
}
c.decExpectedChunkReceive(req)

if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
chunksReceived.Add("due to allowed fast", 1)
ChunksReceived.Add("due to allowed fast", 1)
}

// The request needs to be deleted immediately to prevent cancels occurring asynchronously when
Expand All @@ -661,7 +661,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
c.updateRequests("Peer.receiveChunk deleted request")
}
} else {
chunksReceived.Add("unintended", 1)
ChunksReceived.Add("unintended", 1)
}
}

Expand All @@ -670,7 +670,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// Do we actually want this chunk?
if t.haveChunk(ppReq) {
// panic(fmt.Sprintf("%+v", ppReq))
chunksReceived.Add("redundant", 1)
ChunksReceived.Add("redundant", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1910,10 +1910,11 @@ func (t *Torrent) startWebsocketAnnouncer(u url.URL, shortInfohash [20]byte) tor
go func() {
err := wtc.Announce(tracker.Started, shortInfohash)
if err != nil {
t.logger.WithDefaultLevel(log.Warning).Printf(
"error in initial announce to %q: %v",
u.String(), err,
)
level := log.Warning
if t.closed.IsSet() {
level = log.Debug
}
t.logger.Levelf(level, "error doing initial announce to %q: %v", u.String(), err)
}
}()
return wst
Expand Down
11 changes: 9 additions & 2 deletions webtorrent/tracker-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
return fmt.Errorf("creating offer: %w", err)
}

tc.Logger.Levelf(log.Debug, "announcing offer")
err = tc.announce(event, infoHash, []outboundOffer{
{
offerId: offerIDBinary,
Expand Down Expand Up @@ -308,7 +309,7 @@ func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
if err != nil {
return fmt.Errorf("read message error: %w", err)
}
// tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
tc.Logger.Levelf(log.Debug, "received message: %q", message)

var ar AnnounceResponse
if err := json.Unmarshal(message, &ar); err != nil {
Expand All @@ -333,7 +334,13 @@ func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
case ar.Answer != nil:
tc.handleAnswer(ar.OfferID, *ar.Answer)
default:
tc.Logger.Levelf(log.Warning, "unhandled announce response %q", message)
// wss://tracker.openwebtorrent.com appears to respond to an initial announces without
// an offer or answer. I think that's fine. Let's check it at least contains an
// infohash.
_, err := jsonStringToInfoHash(ar.InfoHash)
if err != nil {
tc.Logger.Levelf(log.Warning, "unexpected announce response %q", message)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion webtorrent/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newPeerConnection(logger log.Logger, iceServers []webrtc.ICEServer) (*wrapp
}
// If the state change handler intends to call Close, it should call it on the wrapper.
wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
logger.Levelf(log.Debug, "webrtc PeerConnection state changed to %v", state)
span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
})
return wpc, nil
Expand Down
14 changes: 10 additions & 4 deletions wstracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,23 @@ func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.Tra
defer me.mu.Unlock()
value, ok := me.clients[url]
if !ok {
dialer := &websocket.Dialer{Proxy: me.Proxy, NetDialContext: me.DialContext, HandshakeTimeout: websocket.DefaultDialer.HandshakeTimeout}
dialer := &websocket.Dialer{
Proxy: me.Proxy,
NetDialContext: me.DialContext,
HandshakeTimeout: websocket.DefaultDialer.HandshakeTimeout,
}
value = &refCountedWebtorrentTrackerClient{
TrackerClient: webtorrent.TrackerClient{
Dialer: dialer,
Url: url,
GetAnnounceRequest: me.GetAnnounceRequest,
PeerId: me.PeerId,
OnConn: me.OnConn,
Logger: me.Logger.WithText(func(m log.Msg) string {
return fmt.Sprintf("tracker client for %q: %v", url, m)
}),
Logger: me.Logger.WithText(
func(m log.Msg) string {
return fmt.Sprintf("tracker client for %q: %v", url, m)
},
),
WebsocketTrackerHttpHeader: me.WebsocketTrackerHttpHeader,
ICEServers: me.ICEServers,
},
Expand Down

0 comments on commit f471182

Please sign in to comment.