Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The following emojis are used to highlight certain changes:

### Changed

- `provider`: previously, the code in this module was logging to `reprovider.simple`, `provider.batched` and `provider.queue` facilities. They have now been consolidated in a single `provider` logging facility, along with some adjustments to logging levels and extra debug statements.
- `bitswap/client`: Added an opt-in ability to reduce bitswap broadcast volume by limiting broadcasts to peers that have previously responded as having wanted blocks and peers on local network. The following bitswap client options are available to configure the behavior of broadcast reduction:
- `BroadcastControlEnable` enables or disables broadcast reduction logic. Setting this to `false` restores the previous broadcast behavior of sending broadcasts to all peers, and ignores all other `BroadcastControl` options. Default is `false` (disabled).
- `BroadcastControlMaxPeers` sets a hard limit on the number of peers to send broadcasts to. A value of `0` means no broadcasts are sent. A value of `-1` means there is no limit. Default is `-1` (unlimited).
Expand All @@ -32,7 +33,6 @@ The following emojis are used to highlight certain changes:
- `BroadcastControlMaxRandomPeers` sets the number of peers to broadcast to anyway, even though broadcast control logic has determined that they are not broadcast targets. Setting this to a non-zero value ensures at least this number of random peers receives a broadcast. This may be helpful in cases where peers that are not receiving broadcasts may have wanted blocks. Default is `0` (no random broadcasts).
- `BroadcastControlSendToPendingPeers` enables or disables sending broadcasts to any peers to which there is a pending message to send. When `true` (enabled), this sends broadcasts to many more peers, but does so in a way that does not increase the number of separate broadcast messages. There is still the increased cost of the recipients having to process and respond to the broadcasts. Default is `false`.


### Removed

- `bitswap/server` do not allow override of peer ledger with `WithPeerLedger` [#938](https://github.com/ipfs/boxo/pull/938)
Expand Down
14 changes: 7 additions & 7 deletions provider/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("provider.queue")
var log = logging.Logger("provider")

const (
// batchSize is the limit on number of CIDs kept in memory at which there
Expand Down Expand Up @@ -142,7 +142,7 @@ func (q *Queue) worker(ctx context.Context) {
defer func() {
if c != cid.Undef {
if err := q.ds.Put(ctx, k, c.Bytes()); err != nil {
log.Errorw("Failed to write cid to datastore", "err", err)
log.Errorw("provider queue: failed to write cid to datastore", "err", err)
}
counter++
}
Expand Down Expand Up @@ -174,18 +174,18 @@ func (q *Queue) worker(ctx context.Context) {
if !dsEmpty {
head, err := q.getQueueHead(ctx)
if err != nil {
log.Errorw("Error querying for head of queue, stopping provider", "err", err)
log.Errorw("provider queue: error querying for head of queue, stopping provider", "err", err)
return
}
if head != nil {
k = datastore.NewKey(head.Key)
if err = q.ds.Delete(ctx, k); err != nil {
log.Errorw("Error deleting queue entry, stopping provider", "err", err, "key", head.Key)
log.Errorw("provider queue: error deleting queue entry, stopping provider", "err", err, "key", head.Key)
return
}
c, err = cid.Parse(head.Value)
if err != nil {
log.Warnw("Error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key)
log.Warnw("provider queue: error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key)
continue
}
} else {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (q *Queue) worker(ctx context.Context) {
err = q.commitInput(ctx, counter, &inBuf)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Errorw("Error writing CIDs to datastore, stopping provider", "err", err)
log.Errorw("provider queue: error writing CIDs to datastore, stopping provider", "err", err)
}
return
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq
c := cids.At(i)
key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr))
if err = b.Put(ctx, key, c.Bytes()); err != nil {
log.Errorw("Failed to add cid to batch", "err", err)
log.Errorw("provider queue: failed to add cid to batch", "err", err)
continue
}
counter++
Expand Down
9 changes: 3 additions & 6 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ import (
pin "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil"
logging "github.com/ipfs/go-log/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
)

var logR = logging.Logger("reprovider.simple")

// Provider announces blocks to the network
type Provider interface {
// Provide takes a cid and makes an attempt to announce it to the network
Expand Down Expand Up @@ -107,7 +104,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
// 1. Recursive keys
for sc := range pinning.RecursiveKeys(ctx, false) {
if sc.Err != nil {
logR.Errorf("reprovide recursive pins: %s", sc.Err)
log.Errorf("reprovide recursive pins: %s", sc.Err)
return
}
if !onlyRoots {
Expand All @@ -120,7 +117,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
// 2. Direct pins
for sc := range pinning.DirectKeys(ctx, false) {
if sc.Err != nil {
logR.Errorf("reprovide direct pins: %s", sc.Err)
log.Errorf("reprovide direct pins: %s", sc.Err)
return
}
_ = set.Visitor(ctx)(sc.Pin.Key)
Expand All @@ -143,7 +140,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
})
})
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
log.Errorf("reprovide indirect pins: %s", err)
return
}
}()
Expand Down
12 changes: 7 additions & 5 deletions provider/reprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
provideDelayWarnDuration = 15 * time.Second
)

var log = logging.Logger("provider.batched")
var log = logging.Logger("provider")

type reprovider struct {
ctx context.Context
Expand Down Expand Up @@ -262,6 +262,7 @@ func (s *reprovider) provideWorker() {
provCh := s.q.Dequeue()

provideFunc := func(ctx context.Context, c cid.Cid) {
log.Debugf("provider worker: providing %s", c)
if err := s.rsys.Provide(ctx, c, true); err != nil {
log.Errorf("failed to provide %s: %s", c, err)
}
Expand Down Expand Up @@ -366,7 +367,7 @@ func (s *reprovider) waitUntilProvideSystemReady() {
ticker = time.NewTicker(time.Minute)
defer ticker.Stop()
}
log.Debugf("reprovider system not ready")
log.Infof("reprovider system not ready, waiting 1m")
select {
case <-ticker.C:
case <-s.ctx.Done():
Expand Down Expand Up @@ -453,16 +454,16 @@ func (s *reprovider) Reprovide(ctx context.Context) error {

s.waitUntilProvideSystemReady()

log.Debugf("starting reprovide of %d keys", len(keys))
log.Infof("starting reprovide of %d keys", len(keys))
start := time.Now()
err := doProvideMany(s.ctx, s.rsys, keys)
if err != nil {
log.Debugf("reproviding failed %v", err)
log.Errorf("reproviding failed %v", err)
continue
}
dur := time.Since(start)
recentAvgProvideDuration := dur / time.Duration(len(keys))
log.Debugf("finished reproviding %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration)
log.Infof("finished reproviding %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration)

totalProvideTime := time.Duration(s.totalReprovides) * s.avgReprovideDuration
s.statLk.Lock()
Expand Down Expand Up @@ -538,6 +539,7 @@ func doProvideMany(ctx context.Context, r Provide, keys []multihash.Multihash) e
}

for _, k := range keys {
log.Debugf("providing %s", k)
if err := r.Provide(ctx, cid.NewCidV1(cid.Raw, k), true); err != nil {
return err
}
Expand Down