Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
144a102
wip
gammazero May 12, 2025
f14784e
update bcast counter
gammazero May 15, 2025
e474982
inc broacast count from messagequeue
gammazero May 15, 2025
f43d15f
Add haves received metric
gammazero May 16, 2025
d62889d
Spam reduction with metrics
gammazero May 27, 2025
a667550
boradcast to peer if peer already has a pending message
gammazero May 27, 2025
53da940
count total blocks and haves
gammazero May 28, 2025
b2af2d0
mark broadcast targets instead of keeping counts
gammazero May 28, 2025
f387b14
unique blocks received metric
gammazero May 28, 2025
4efd574
separate mutex for broadcast targets
gammazero May 28, 2025
7a8e954
use regular mutex for broadcast targets
gammazero May 28, 2025
a9efd6d
disable piggybacking spam on existing messages
gammazero May 28, 2025
b33463b
const to enable/disable broadcast reduction logic
gammazero May 29, 2025
81e57aa
update changelog
gammazero May 29, 2025
75ddcd8
Merge branch 'main' into broadcast-metrics
gammazero May 29, 2025
be3de17
bitswap client options for broadcast reduction configuration
gammazero Jun 4, 2025
f86d849
Rename broadcast config item
gammazero Jun 5, 2025
c6c4b45
Update bitswap/client/internal/peermanager/peerwantmanager.go
gammazero Jun 12, 2025
c08b882
Merge branch 'main' into broadcast-metrics
gammazero Jun 12, 2025
9bdf013
Rename and document bitswap broadcast config variables
gammazero Jun 12, 2025
ce32315
Document btswap broadcast reduction options in changelog
gammazero Jun 12, 2025
9205716
fix doc string
gammazero Jun 12, 2025
6793986
doc fixes
gammazero Jun 12, 2025
d468638
Update bitswap/client/internal/peermanager/peerwantmanager.go
gammazero Jun 12, 2025
0a44a99
Review changes
gammazero Jun 13, 2025
edc472a
Update CHANGELOG
gammazero Jun 13, 2025
6b6a018
Fix lint warn
gammazero Jun 13, 2025
d44c1a2
fix docstring
gammazero Jun 13, 2025
647c30b
Need SkipGauge in config
gammazero Jun 13, 2025
d5d913d
Review changes
gammazero Jun 13, 2025
83495ed
Merge branch 'main' into broadcast-metrics
gammazero Jun 13, 2025
411f594
assume local if peer has no addrs
gammazero Jun 14, 2025
6a664c5
chore: typos
lidel Jun 16, 2025
db891f9
docs(changelog): document new metrics
lidel Jun 16, 2025
61a9144
Merge branch 'main' into broadcast-metrics
gammazero Jun 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,23 @@ The following emojis are used to highlight certain changes:

### Added

- `bitswap/client`: New metrics:
- `ipfs_bitswap_wanthaves_broadcast`: Count of want-haves broadcasts
- `ipfs_bitswap_haves_received`: Count of total have responses
- `ipfs_bitswap_bcast_skips_total{`: Count of broadcasts skipped as part of spam reduction logic (see "Changed" below)
- `ipfs_bitswap_unique_blocks_received`: Count of non-duplicate blocks recieved

### Changed

- `bitswap/client`: Added an opt-in ability to reduce bitswap broadcast volume by limiting broadcasts to peers that have previously responded as having wanted blocks and peers on local network. The following bitswap client options are available to configure the behavior of broadcast reduction:
- `BroadcastControlEnable` enables or disables broadcast reduction logic. Setting this to `false` restores the previous broadcast behavior of sending broadcasts to all peers, and ignores all other `BroadcastControl` options. Default is `false` (disabled).
- `BroadcastControlMaxPeers` sets a hard limit on the number of peers to send broadcasts to. A value of `0` means no broadcasts are sent. A value of `-1` means there is no limit. Default is `-1` (unlimited).
- `BroadcastControlLocalPeers` enables or disables broadcast control for peers on the local network. If `false`, then always broadcast to peers on the local network. If `true`, apply broadcast control to local peers. Default is `false` (always broadcast to local peers).
- `BroadcastControlPeeredPeers` enables or disables broadcast control for peers configured for peering. If `false`, then always broadcast to peers configured for peering. If `true`, apply broadcast control to peered peers. Default is `false` (always broadcast to peered peers).
- `BroadcastControlMaxRandomPeers` sets the number of peers to broadcast to anyway, even though broadcast control logic has determined that they are not broadcast targets. Setting this to a non-zero value ensures at least this number of random peers receives a broadcast. This may be helpful in cases where peers that are not receiving broadcasts may have wanted blocks. Default is `0` (no random broadcasts).
- `BroadcastControlSendToPendingPeers` enables or disables sending broadcasts to any peers to which there is a pending message to send. When `true` (enabled), this sends broadcasts to many more peers, but does so in a way that does not increase the number of separate broadcast messages. There is still the increased cost of the recipients having to process and respond to the broadcasts. Default is `false`.


### Removed

- `bitswap/server` do not allow override of peer ledger with `WithPeerLedger` [#938](https://github.com/ipfs/boxo/pull/938)
Expand Down
91 changes: 89 additions & 2 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,69 @@ func WithDefaultProviderQueryManager(defaultProviderQueryManager bool) Option {
}
}

// BroadcastControlEnable enables or disables broadcast reduction logic.
// Setting this to false restores the previous broadcast behavior of sending
// broadcasts to all peers, and ignores all other BroadcastControl options.
// Default is false (disabled).
func BroadcastControlEnable(enable bool) Option {
return func(bs *Client) {
bs.bcastControl.Enable = enable
}
}

// BroadcastControlMaxPeers sets a hard limit on the number of peers to send
// broadcasts to. A value of 0 means no broadcasts are sent. A value of -1
// means there is no limit. Default is -1 (unlimited).
func BroadcastControlMaxPeers(limit int) Option {
return func(bs *Client) {
bs.bcastControl.MaxPeers = limit
}
}

// BroadcastControlLocalPeers enables or disables broadcast control for peers
// on the local network. If false, than always broadcast to peers on the local
// network. If true, apply broadcast control to local peers. Default is false
// (always broadcast to local peers).
func BroadcastControlLocalPeers(enable bool) Option {
return func(bs *Client) {
bs.bcastControl.LocalPeers = enable
}
}

// BroadcastControlPeeredPeers enables or disables broadcast control for peers
// configured for peering. If false, than always broadcast to peers configured
// for peering. If true, apply broadcast control to peered peers. Default is
// false (always broadcast to peered peers).
func BroadcastControlPeeredPeers(enable bool) Option {
return func(bs *Client) {
bs.bcastControl.PeeredPeers = enable
}
}

// BroadcastControlMaxRandomPeers sets the number of peers to broadcast to
// anyway, even though broadcast control logic has determined that they are
// not broadcast targets. Setting this to a non-zero value ensures at least
// this number of random peers receives a broadcast. This may be helpful in
// cases where peers that are not receiving broadcasts may have wanted blocks.
// Default is 0 (no random broadcasts).
func BroadcastControlMaxRandomPeers(n int) Option {
return func(bs *Client) {
bs.bcastControl.MaxRandomPeers = n
}
}

// BroadcastControlSendToPendingPeers, enables or disables sending broadcasts
// to any peers to which there is a pending message to send. When enabled, this
// sends broadcasts to many more peers, but does so in a way that does not
// increase the number of separate broadcast messages. There is still the
// increased cost of the recipients having to process and respond to the
// broadcasts. Default is false.
func BroadcastControlSendToPendingPeers(enable bool) Option {
return func(bs *Client) {
bs.bcastControl.SendToPendingPeers = enable
}
}

type BlockReceivedNotifier interface {
// ReceivedBlocks notifies the decision engine that a peer is well-behaving
// and gave us useful data, potentially increasing its score and making us
Expand Down Expand Up @@ -166,17 +229,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
counters: new(counters),
dupMetric: bmetrics.DupHist(ctx),
allMetric: bmetrics.AllHist(ctx),
havesReceivedGauge: bmetrics.HavesReceivedGauge(ctx),
blocksReceivedGauge: bmetrics.BlocksReceivedGauge(ctx),
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay),
simulateDontHavesOnTimeout: true,
defaultProviderQueryManager: true,

bcastControl: bspm.BroadcastControl{
MaxPeers: -1,
},
}

// apply functional options before starting and running bitswap
for _, option := range options {
option(bs)
}

if bs.bcastControl.Enable {
if bs.bcastControl.NeedHost() {
bs.bcastControl.Host = network.Host()
}
bs.bcastControl.SkipGauge = bmetrics.BroadcastSkipGauge(ctx)
}

// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
Expand All @@ -201,7 +277,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory)
pm := bspm.New(ctx, peerQueueFactory, bs.bcastControl)

if bs.providerFinder != nil && bs.defaultProviderQueryManager {
// network can do dialing.
Expand Down Expand Up @@ -237,7 +313,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
} else if providerFinder != nil {
sessionProvFinder = providerFinder
}
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self, bs.havesReceivedGauge)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network)
Expand Down Expand Up @@ -285,6 +361,9 @@ type Client struct {
dupMetric metrics.Histogram
allMetric metrics.Histogram

havesReceivedGauge bspm.Gauge
blocksReceivedGauge bspm.Gauge

// External statistics interface
tracer tracer.Tracer

Expand All @@ -311,6 +390,9 @@ type Client struct {
skipDuplicatedBlocksStats bool

perPeerSendDelay time.Duration

// Broadcast control configuration.
bcastControl bspm.BroadcastControl
}

type counters struct {
Expand Down Expand Up @@ -384,6 +466,10 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
default:
}

if len(blks) != 0 || len(haves) != 0 {
bs.pm.MarkBroadcastTarget(from)
}

wanted, notWanted := bs.sim.SplitWantedUnwanted(blks)
if log.Level().Enabled(zapcore.DebugLevel) {
for _, b := range notWanted {
Expand Down Expand Up @@ -484,6 +570,7 @@ func (bs *Client) updateReceiveCounters(blocks []blocks.Block) {
c.dupBlocksRecvd++
c.dupDataRecvd += uint64(blkLen)
}
bs.blocksReceivedGauge.Inc()
}
}

Expand Down
12 changes: 12 additions & 0 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type MessageQueue struct {
events chan<- messageEvent

perPeerDelay time.Duration

BcastInc func()
}

// recallWantlist keeps a list of pending wants and a list of sent wants
Expand Down Expand Up @@ -424,6 +426,13 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
}
}

func (mq *MessageQueue) HasMessage() bool {
mq.wllock.Lock()
defer mq.wllock.Unlock()

return mq.bcstWants.pending.Len() != 0 || mq.peerWants.pending.Len() != 0 || mq.cancels.Len() != 0
}

// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
Expand Down Expand Up @@ -862,6 +871,9 @@ FINISH:
for _, e := range bcstEntries[:sentBcstEntries] {
if e.Cid.Defined() { // Check if want was canceled in the interim
mq.bcstWants.setSentAt(e.Cid, now)
if mq.BcastInc != nil {
mq.BcastInc()
}
}
}

Expand Down
19 changes: 17 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
Expand All @@ -18,6 +19,7 @@ type PeerQueue interface {
AddWants([]cid.Cid, []cid.Cid)
AddCancels([]cid.Cid)
ResponseReceived(ks []cid.Cid)
HasMessage() bool
Startup()
Shutdown()
}
Expand All @@ -44,20 +46,25 @@ type PeerManager struct {
psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

bcastGauge Gauge
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
func New(ctx context.Context, createPeerQueue PeerQueueFactory, bcastControl BroadcastControl) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()

return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
pwm: newPeerWantManager(wantGauge, wantBlockGauge, bcastControl),
createPeerQueue: createPeerQueue,
ctx: ctx,

sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),

bcastGauge: metrics.NewCtx(ctx, "wanthaves_broadcast", "Number of want-haves broadcast.").Gauge(),
}
}

Expand Down Expand Up @@ -189,6 +196,10 @@ func (pm *PeerManager) getOrCreate(p peer.ID) PeerQueue {
pq, ok := pm.peerQueues[p]
if !ok {
pq = pm.createPeerQueue(pm.ctx, p)
mq, ok := pq.(*messagequeue.MessageQueue)
if ok {
mq.BcastInc = pm.bcastGauge.Inc
}
pq.Startup()
pm.peerQueues[p] = pq
}
Expand Down Expand Up @@ -227,6 +238,10 @@ func (pm *PeerManager) UnregisterSession(ses uint64) {
delete(pm.sessions, ses)
}

func (pm *PeerManager) MarkBroadcastTarget(from peer.ID) {
pm.pwm.markBroadcastTarget(from)
}

// signalAvailability is called when a peer's connectivity changes.
// It informs interested sessions.
func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) {
Expand Down
19 changes: 12 additions & 7 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
fp.msgs <- msg{fp.p, nil, nil, cs}
}

func (fp *mockPeerQueue) HasMessage() bool {
return true
}

func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) {
}

Expand Down Expand Up @@ -86,7 +90,7 @@ func TestAddingAndRemovingPeers(t *testing.T) {

tp := random.Peers(6)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

peerManager.Connected(peer1)
peerManager.Connected(peer2)
Expand Down Expand Up @@ -129,7 +133,7 @@ func TestBroadcastOnConnect(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(2)
peer1 := tp[0]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

cids := random.Cids(2)
peerManager.BroadcastWantHaves(ctx, cids)
Expand All @@ -150,7 +154,7 @@ func TestBroadcastWantHaves(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
peer1, peer2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

cids := random.Cids(3)

Expand Down Expand Up @@ -191,7 +195,7 @@ func TestSendWants(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(2)
peer1 := tp[0]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)
cids := random.Cids(4)

peerManager.Connected(peer1)
Expand Down Expand Up @@ -225,7 +229,7 @@ func TestSendCancels(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
peer1, peer2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)
cids := random.Cids(4)

// Connect to peer1 and peer2
Expand Down Expand Up @@ -286,7 +290,7 @@ func TestSessionRegistration(t *testing.T) {

tp := random.Peers(3)
p1, p2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

id := uint64(1)
s := newSess(id)
Expand Down Expand Up @@ -332,6 +336,7 @@ func (*benchPeerQueue) Shutdown() {}
func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {}
func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {}
func (*benchPeerQueue) AddCancels(cs []cid.Cid) {}
func (*benchPeerQueue) HasMessage() bool { return true }
func (*benchPeerQueue) ResponseReceived(ks []cid.Cid) {}

// Simplistic benchmark to allow us to stress test
Expand All @@ -345,7 +350,7 @@ func BenchmarkPeerManager(b *testing.B) {
}

peers := random.Peers(500)
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

// Create a bunch of connections
connected := 0
Expand Down
Loading