Skip to content

Commit

Permalink
Fix errors in webseed causing very long stalls in requesting
Browse files Browse the repository at this point in the history
Reuse the "too fast" error handling for all errors when requesting from webseeds. This prevents long stalls due to common errors.
  • Loading branch information
anacrolix committed Aug 27, 2024
1 parent 3b0b61f commit b3aea1a
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/anacrolix/torrent

go 1.22
go 1.23

require (
github.com/RoaringBitmap/roaring v1.2.3
Expand Down
15 changes: 14 additions & 1 deletion ordered-bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package torrent
import (
g "github.com/anacrolix/generics"
list "github.com/bahlo/generic-list-go"
"iter"

"github.com/anacrolix/torrent/typed-roaring"
)
Expand Down Expand Up @@ -41,12 +42,24 @@ func (o *orderedBitmap[T]) Rank(index T) uint64 {
return o.bitmap.Rank(index)
}

func (o *orderedBitmap[T]) Iterate(f func(T) bool) {
func (o *orderedBitmap[T]) Iterate(f func(T) bool) (all bool) {
for e := o.order.Front(); e != nil; e = e.Next() {
if !f(e.Value) {
return
}
}
all = true
return
}

func (o *orderedBitmap[T]) Iterator() iter.Seq[T] {
return func(yield func(T) bool) {
for e := o.order.Front(); e != nil; e = e.Next() {
if !yield(e.Value) {
return
}
}
}
}

func (o *orderedBitmap[T]) CheckedRemove(index T) bool {
Expand Down
5 changes: 4 additions & 1 deletion request-strategy/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package requestStrategy

import (
typedRoaring "github.com/anacrolix/torrent/typed-roaring"
"iter"
)

type PeerRequestState struct {
Expand All @@ -24,7 +25,9 @@ type PeerRequests interface {
// See roaring.Bitmap.Rank.
Rank(RequestIndex) uint64
// Must yield in order items were added.
Iterate(func(RequestIndex) bool)
Iterate(func(RequestIndex) bool) (all bool)
// Must yield in order items were added.
Iterator() iter.Seq[RequestIndex]
// See roaring.Bitmap.CheckedRemove.
CheckedRemove(RequestIndex) bool
// Iterate a snapshot of the values. It is safe to mutate the underlying data structure.
Expand Down
2 changes: 1 addition & 1 deletion tests/webseed-partial-seed/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/anacrolix/torrent/tests/webseed-partial-seed

go 1.22.3
go 1.23

require (
github.com/anacrolix/torrent v1.56.1
Expand Down
59 changes: 30 additions & 29 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
)

const (
webseedPeerUnhandledErrorSleep = 5 * time.Second
webseedPeerCloseOnUnhandledError = false
)

Expand Down Expand Up @@ -76,50 +75,52 @@ func (ws *webseedPeer) _request(r Request) bool {
return true
}

func (ws *webseedPeer) doRequest(r Request) error {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
// Returns true if we should look for another request to start. Returns false if we handled this
// one.
func (ws *webseedPeer) requestIteratorLocked(requesterIndex int, x RequestIndex) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
return true
}
webseedRequest := ws.client.StartNewRequest(ws.intoSpec(r))
ws.activeRequests[r] = webseedRequest
err := func() error {
ws.requesterCond.L.Unlock()
defer ws.requesterCond.L.Lock()
return ws.requestResultHandler(r, webseedRequest)
}()
delete(ws.activeRequests, r)
return err
ws.requesterCond.L.Unlock()
defer ws.requesterCond.L.Lock()
if err != nil {
level := log.Warning
if errors.Is(err, context.Canceled) {
level = log.Debug
}
ws.peer.logger.Levelf(level, "requester %v: error doing webseed request %v: %v", requesterIndex, r, err)
// This used to occur only on webseed.ErrTooFast but I think it makes sense to slow down
// any kind of error. There are maxRequests (in Torrent.addWebSeed) requestors bouncing
// around it doesn't hurt to slow a few down if there are issues.
select {
case <-ws.peer.closed.Done():
case <-time.After(time.Duration(rand.Int63n(int64(10 * time.Second)))):
}
}
return false

}

func (ws *webseedPeer) requester(i int) {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()
start:
for !ws.peer.closed.IsSet() {
// Restart is set if we don't need to wait for the requestCond before trying again.
restart := false
ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
return true
}
err := ws.doRequest(r)
ws.requesterCond.L.Unlock()
if err != nil && !errors.Is(err, context.Canceled) {
ws.peer.logger.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
}
restart = true
if errors.Is(err, webseed.ErrTooFast) {
time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
for reqIndex := range ws.peer.requestState.Requests.Iterator() {
if !ws.requestIteratorLocked(i, reqIndex) {
goto start
}
// Demeter is throwing a tantrum on Mount Olympus for this
ws.peer.t.cl.locker().RLock()
duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))
ws.peer.t.cl.locker().RUnlock()
time.Sleep(duration)
ws.requesterCond.L.Lock()
return false
})
if restart {
goto start
}
// Found no requests to handle, so wait.
ws.requesterCond.Wait()
}
}
Expand Down
4 changes: 2 additions & 2 deletions webseed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ type RequestResult struct {
Err error
}

func (ws *Client) NewRequest(r RequestSpec) Request {
ctx, cancel := context.WithCancel(context.Background())
func (ws *Client) StartNewRequest(r RequestSpec) Request {
ctx, cancel := context.WithCancel(context.TODO())
var requestParts []requestPart
if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
req, err := newRequest(
Expand Down

0 comments on commit b3aea1a

Please sign in to comment.