diff --git a/CHANGELOG.md b/CHANGELOG.md index 7227a8832..7c4549921 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The following emojis are used to highlight certain changes: ### Changed +- `provider`: previously, the code in this module was logging to `reprovider.simple`, `provider.batched` and `provider.queue` facilities. They have now been consolidated in a single `provider` logging facility, along with some adjustments to logging levels and extra debug statements. - `bitswap/client`: Added an opt-in ability to reduce bitswap broadcast volume by limiting broadcasts to peers that have previously responded as having wanted blocks and peers on local network. The following bitswap client options are available to configure the behavior of broadcast reduction: - `BroadcastControlEnable` enables or disables broadcast reduction logic. Setting this to `false` restores the previous broadcast behavior of sending broadcasts to all peers, and ignores all other `BroadcastControl` options. Default is `false` (disabled). - `BroadcastControlMaxPeers` sets a hard limit on the number of peers to send broadcasts to. A value of `0` means no broadcasts are sent. A value of `-1` means there is no limit. Default is `-1` (unlimited). @@ -32,7 +33,6 @@ The following emojis are used to highlight certain changes: - `BroadcastControlMaxRandomPeers` sets the number of peers to broadcast to anyway, even though broadcast control logic has determined that they are not broadcast targets. Setting this to a non-zero value ensures at least this number of random peers receives a broadcast. This may be helpful in cases where peers that are not receiving broadcasts may have wanted blocks. Default is `0` (no random broadcasts). - `BroadcastControlSendToPendingPeers` enables or disables sending broadcasts to any peers to which there is a pending message to send. When `true` (enabled), this sends broadcasts to many more peers, but does so in a way that does not increase the number of separate broadcast messages. There is still the increased cost of the recipients having to process and respond to the broadcasts. Default is `false`. - ### Removed - `bitswap/server` do not allow override of peer ledger with `WithPeerLedger` [#938](https://github.com/ipfs/boxo/pull/938) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 8dcccb178..da61cd6b3 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -17,7 +17,7 @@ import ( logging "github.com/ipfs/go-log/v2" ) -var log = logging.Logger("provider.queue") +var log = logging.Logger("provider") const ( // batchSize is the limit on number of CIDs kept in memory at which there @@ -142,7 +142,7 @@ func (q *Queue) worker(ctx context.Context) { defer func() { if c != cid.Undef { if err := q.ds.Put(ctx, k, c.Bytes()); err != nil { - log.Errorw("Failed to write cid to datastore", "err", err) + log.Errorw("provider queue: failed to write cid to datastore", "err", err) } counter++ } @@ -174,18 +174,18 @@ func (q *Queue) worker(ctx context.Context) { if !dsEmpty { head, err := q.getQueueHead(ctx) if err != nil { - log.Errorw("Error querying for head of queue, stopping provider", "err", err) + log.Errorw("provider queue: error querying for head of queue, stopping provider", "err", err) return } if head != nil { k = datastore.NewKey(head.Key) if err = q.ds.Delete(ctx, k); err != nil { - log.Errorw("Error deleting queue entry, stopping provider", "err", err, "key", head.Key) + log.Errorw("provider queue: error deleting queue entry, stopping provider", "err", err, "key", head.Key) return } c, err = cid.Parse(head.Value) if err != nil { - log.Warnw("Error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key) + log.Warnw("provider queue: error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key) continue } } else { @@ -257,7 +257,7 @@ func (q *Queue) worker(ctx context.Context) { err = q.commitInput(ctx, counter, &inBuf) if err != nil { if !errors.Is(err, context.Canceled) { - log.Errorw("Error writing CIDs to datastore, stopping provider", "err", err) + log.Errorw("provider queue: error writing CIDs to datastore, stopping provider", "err", err) } return } @@ -296,7 +296,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq c := cids.At(i) key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) if err = b.Put(ctx, key, c.Bytes()); err != nil { - log.Errorw("Failed to add cid to batch", "err", err) + log.Errorw("provider queue: failed to add cid to batch", "err", err) continue } counter++ diff --git a/provider/provider.go b/provider/provider.go index 485a3ff4c..c9b3b3dec 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -16,12 +16,9 @@ import ( pin "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" - logging "github.com/ipfs/go-log/v2" cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) -var logR = logging.Logger("reprovider.simple") - // Provider announces blocks to the network type Provider interface { // Provide takes a cid and makes an attempt to announce it to the network @@ -107,7 +104,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory // 1. Recursive keys for sc := range pinning.RecursiveKeys(ctx, false) { if sc.Err != nil { - logR.Errorf("reprovide recursive pins: %s", sc.Err) + log.Errorf("reprovide recursive pins: %s", sc.Err) return } if !onlyRoots { @@ -120,7 +117,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory // 2. Direct pins for sc := range pinning.DirectKeys(ctx, false) { if sc.Err != nil { - logR.Errorf("reprovide direct pins: %s", sc.Err) + log.Errorf("reprovide direct pins: %s", sc.Err) return } _ = set.Visitor(ctx)(sc.Pin.Key) @@ -143,7 +140,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory }) }) if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) + log.Errorf("reprovide indirect pins: %s", err) return } }() diff --git a/provider/reprovider.go b/provider/reprovider.go index 65e21ccd6..7cb6b7d3f 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -37,7 +37,7 @@ const ( provideDelayWarnDuration = 15 * time.Second ) -var log = logging.Logger("provider.batched") +var log = logging.Logger("provider") type reprovider struct { ctx context.Context @@ -262,6 +262,7 @@ func (s *reprovider) provideWorker() { provCh := s.q.Dequeue() provideFunc := func(ctx context.Context, c cid.Cid) { + log.Debugf("provider worker: providing %s", c) if err := s.rsys.Provide(ctx, c, true); err != nil { log.Errorf("failed to provide %s: %s", c, err) } @@ -366,7 +367,7 @@ func (s *reprovider) waitUntilProvideSystemReady() { ticker = time.NewTicker(time.Minute) defer ticker.Stop() } - log.Debugf("reprovider system not ready") + log.Infof("reprovider system not ready, waiting 1m") select { case <-ticker.C: case <-s.ctx.Done(): @@ -453,16 +454,16 @@ func (s *reprovider) Reprovide(ctx context.Context) error { s.waitUntilProvideSystemReady() - log.Debugf("starting reprovide of %d keys", len(keys)) + log.Infof("starting reprovide of %d keys", len(keys)) start := time.Now() err := doProvideMany(s.ctx, s.rsys, keys) if err != nil { - log.Debugf("reproviding failed %v", err) + log.Errorf("reproviding failed %v", err) continue } dur := time.Since(start) recentAvgProvideDuration := dur / time.Duration(len(keys)) - log.Debugf("finished reproviding %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) + log.Infof("finished reproviding %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) totalProvideTime := time.Duration(s.totalReprovides) * s.avgReprovideDuration s.statLk.Lock() @@ -538,6 +539,7 @@ func doProvideMany(ctx context.Context, r Provide, keys []multihash.Multihash) e } for _, k := range keys { + log.Debugf("providing %s", k) if err := r.Provide(ctx, cid.NewCidV1(cid.Raw, k), true); err != nil { return err }