Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
144a102
wip
gammazero May 12, 2025
f14784e
update bcast counter
gammazero May 15, 2025
e474982
inc broacast count from messagequeue
gammazero May 15, 2025
f43d15f
Add haves received metric
gammazero May 16, 2025
d62889d
Spam reduction with metrics
gammazero May 27, 2025
a667550
boradcast to peer if peer already has a pending message
gammazero May 27, 2025
53da940
count total blocks and haves
gammazero May 28, 2025
b2af2d0
mark broadcast targets instead of keeping counts
gammazero May 28, 2025
f387b14
unique blocks received metric
gammazero May 28, 2025
4efd574
separate mutex for broadcast targets
gammazero May 28, 2025
7a8e954
use regular mutex for broadcast targets
gammazero May 28, 2025
a9efd6d
disable piggybacking spam on existing messages
gammazero May 28, 2025
b33463b
const to enable/disable broadcast reduction logic
gammazero May 29, 2025
81e57aa
update changelog
gammazero May 29, 2025
75ddcd8
Merge branch 'main' into broadcast-metrics
gammazero May 29, 2025
be3de17
bitswap client options for broadcast reduction configuration
gammazero Jun 4, 2025
f86d849
Rename broadcast config item
gammazero Jun 5, 2025
c6c4b45
Update bitswap/client/internal/peermanager/peerwantmanager.go
gammazero Jun 12, 2025
c08b882
Merge branch 'main' into broadcast-metrics
gammazero Jun 12, 2025
9bdf013
Rename and document bitswap broadcast config variables
gammazero Jun 12, 2025
ce32315
Document btswap broadcast reduction options in changelog
gammazero Jun 12, 2025
9205716
fix doc string
gammazero Jun 12, 2025
6793986
doc fixes
gammazero Jun 12, 2025
d468638
Update bitswap/client/internal/peermanager/peerwantmanager.go
gammazero Jun 12, 2025
0a44a99
Review changes
gammazero Jun 13, 2025
edc472a
Update CHANGELOG
gammazero Jun 13, 2025
6b6a018
Fix lint warn
gammazero Jun 13, 2025
d44c1a2
fix docstring
gammazero Jun 13, 2025
647c30b
Need SkipGauge in config
gammazero Jun 13, 2025
d5d913d
Review changes
gammazero Jun 13, 2025
83495ed
Merge branch 'main' into broadcast-metrics
gammazero Jun 13, 2025
411f594
assume local if peer has no addrs
gammazero Jun 14, 2025
6a664c5
chore: typos
lidel Jun 16, 2025
db891f9
docs(changelog): document new metrics
lidel Jun 16, 2025
61a9144
Merge branch 'main' into broadcast-metrics
gammazero Jun 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The following emojis are used to highlight certain changes:

### Changed

- `bitswap/client`: Reduce bitswap broadcast volume by limiting broadcasts to peers that have previously responded as having wanted blocks and peers on local network.
Comment thread
gammazero marked this conversation as resolved.
Outdated

### Removed

- `bitswap/server` do not allow override of peer ledger with `WithPeerLedger` [#938](https://github.com/ipfs/boxo/pull/938)
Expand Down
8 changes: 6 additions & 2 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ func TestCustomProviderQueryManager(t *testing.T) {
defer cancel()

bs := bitswap.New(ctx, a.Adapter, pqm, a.Blockstore,
bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false)))
bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false)),
bitswap.WithClientOption(client.WithBroadcastReduction(false)),
)
a.Exchange.Close() // close old to be sure.
a.Exchange = bs
// Connect instances only after bitswap exists.
Expand Down Expand Up @@ -586,7 +588,9 @@ func TestDontHaveTimeoutConfig(t *testing.T) {
defer cancel()

bs := bitswap.New(ctx, a.Adapter, pqm, a.Blockstore,
bitswap.WithClientOption(client.WithDontHaveTimeoutConfig(cfg)))
bitswap.WithClientOption(client.WithDontHaveTimeoutConfig(cfg)),
bitswap.WithClientOption(client.WithBroadcastReduction(false)),
Comment thread
gammazero marked this conversation as resolved.
Outdated
)
a.Exchange.Close()
a.Exchange = bs

Expand Down
65 changes: 63 additions & 2 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,36 @@ func WithDefaultProviderQueryManager(defaultProviderQueryManager bool) Option {
}
}

func WithBroadcastReduction(enable bool) Option {
Comment thread
gammazero marked this conversation as resolved.
Outdated
return func(bs *Client) {
bs.bcastReduction = enable
}
}

func WithBroadcastReduceLocal(enable bool) Option {
return func(bs *Client) {
bs.bcastReduceLocal = enable
}
}

func WithBroadcastSendSkipped(n int) Option {
return func(bs *Client) {
bs.bcastSendSkipped = n
}
}

func WithBroadcastLimitPeers(limit int) Option {
return func(bs *Client) {
bs.bcastLimitPeers = limit
}
}

func WithBroadcastSendWithPending(enable bool) Option {
return func(bs *Client) {
bs.bcastSendWithPending = enable
}
}

type BlockReceivedNotifier interface {
// ReceivedBlocks notifies the decision engine that a peer is well-behaving
// and gave us useful data, potentially increasing its score and making us
Expand Down Expand Up @@ -166,17 +196,33 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
counters: new(counters),
dupMetric: bmetrics.DupHist(ctx),
allMetric: bmetrics.AllHist(ctx),
havesReceivedGauge: bmetrics.HavesReceivedGauge(ctx),
uniqueBlocksReceivedGauge: bmetrics.UniqueBlocksReceivedGauge(ctx),
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay),
simulateDontHavesOnTimeout: true,
defaultProviderQueryManager: true,
bcastReduction: true,
}

// apply functional options before starting and running bitswap
for _, option := range options {
option(bs)
}

var bcastConfig *bspm.BroadcastConfig
if bs.bcastReduction {
bcastConfig = &bspm.BroadcastConfig{
LimitPeers: bs.bcastLimitPeers,
SendSkipped: bs.bcastSendSkipped,
SendWithPending: bs.bcastSendWithPending,
SkipGauge: bmetrics.BroadcastSkipGauge(ctx),
}
if !bs.bcastReduceLocal {
bcastConfig.LocalAlways = network.GetPeerstore()
}
}

// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
Expand All @@ -201,7 +247,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro

sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory)
pm := bspm.New(ctx, peerQueueFactory, bcastConfig)

if bs.providerFinder != nil && bs.defaultProviderQueryManager {
// network can do dialing.
Expand Down Expand Up @@ -237,7 +283,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
} else if providerFinder != nil {
sessionProvFinder = providerFinder
}
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self, bs.havesReceivedGauge)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network)
Expand Down Expand Up @@ -285,6 +331,9 @@ type Client struct {
dupMetric metrics.Histogram
allMetric metrics.Histogram

havesReceivedGauge bspm.Gauge
uniqueBlocksReceivedGauge bspm.Gauge

// External statistics interface
tracer tracer.Tracer

Expand All @@ -311,6 +360,12 @@ type Client struct {
skipDuplicatedBlocksStats bool

perPeerSendDelay time.Duration

bcastReduction bool
bcastReduceLocal bool
bcastLimitPeers int
bcastSendSkipped int
bcastSendWithPending bool
}

type counters struct {
Expand Down Expand Up @@ -384,6 +439,10 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl
default:
}

if len(blks) != 0 || len(haves) != 0 {
bs.pm.MarkBroadcastTarget(from)
}

wanted, notWanted := bs.sim.SplitWantedUnwanted(blks)
if log.Level().Enabled(zapcore.DebugLevel) {
for _, b := range notWanted {
Expand Down Expand Up @@ -483,6 +542,8 @@ func (bs *Client) updateReceiveCounters(blocks []blocks.Block) {
if has {
c.dupBlocksRecvd++
c.dupDataRecvd += uint64(blkLen)
} else {
bs.uniqueBlocksReceivedGauge.Inc()
Comment thread
gammazero marked this conversation as resolved.
Outdated
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type MessageQueue struct {
events chan<- messageEvent

perPeerDelay time.Duration

BcastInc func()
}

// recallWantlist keeps a list of pending wants and a list of sent wants
Expand Down Expand Up @@ -424,6 +426,13 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
}
}

func (mq *MessageQueue) HasMessage() bool {
mq.wllock.Lock()
defer mq.wllock.Unlock()

return mq.bcstWants.pending.Len() != 0 || mq.peerWants.pending.Len() != 0 || mq.cancels.Len() != 0
}

// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
Expand Down Expand Up @@ -862,6 +871,9 @@ FINISH:
for _, e := range bcstEntries[:sentBcstEntries] {
if e.Cid.Defined() { // Check if want was canceled in the interim
mq.bcstWants.setSentAt(e.Cid, now)
if mq.BcastInc != nil {
mq.BcastInc()
}
}
}

Expand Down
19 changes: 17 additions & 2 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
Expand All @@ -18,6 +19,7 @@ type PeerQueue interface {
AddWants([]cid.Cid, []cid.Cid)
AddCancels([]cid.Cid)
ResponseReceived(ks []cid.Cid)
HasMessage() bool
Startup()
Shutdown()
}
Expand All @@ -44,20 +46,25 @@ type PeerManager struct {
psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

bcastGauge Gauge
}

// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
func New(ctx context.Context, createPeerQueue PeerQueueFactory, bcastConfig *BroadcastConfig) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()

return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
pwm: newPeerWantManager(wantGauge, wantBlockGauge, bcastConfig),
createPeerQueue: createPeerQueue,
ctx: ctx,

sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),

bcastGauge: metrics.NewCtx(ctx, "wanthaves_broadcast", "Number of want-haves broadcast.").Gauge(),
}
}

Expand Down Expand Up @@ -189,6 +196,10 @@ func (pm *PeerManager) getOrCreate(p peer.ID) PeerQueue {
pq, ok := pm.peerQueues[p]
if !ok {
pq = pm.createPeerQueue(pm.ctx, p)
mq, ok := pq.(*messagequeue.MessageQueue)
if ok {
mq.BcastInc = pm.bcastGauge.Inc
}
pq.Startup()
pm.peerQueues[p] = pq
}
Expand Down Expand Up @@ -227,6 +238,10 @@ func (pm *PeerManager) UnregisterSession(ses uint64) {
delete(pm.sessions, ses)
}

func (pm *PeerManager) MarkBroadcastTarget(from peer.ID) {
pm.pwm.markBroadcastTarget(from)
}

// signalAvailability is called when a peer's connectivity changes.
// It informs interested sessions.
func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) {
Expand Down
19 changes: 12 additions & 7 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
fp.msgs <- msg{fp.p, nil, nil, cs}
}

func (fp *mockPeerQueue) HasMessage() bool {
return true
}

func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) {
}

Expand Down Expand Up @@ -86,7 +90,7 @@ func TestAddingAndRemovingPeers(t *testing.T) {

tp := random.Peers(6)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

peerManager.Connected(peer1)
peerManager.Connected(peer2)
Expand Down Expand Up @@ -129,7 +133,7 @@ func TestBroadcastOnConnect(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(2)
peer1 := tp[0]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

cids := random.Cids(2)
peerManager.BroadcastWantHaves(ctx, cids)
Expand All @@ -150,7 +154,7 @@ func TestBroadcastWantHaves(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
peer1, peer2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

cids := random.Cids(3)

Expand Down Expand Up @@ -191,7 +195,7 @@ func TestSendWants(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(2)
peer1 := tp[0]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)
cids := random.Cids(4)

peerManager.Connected(peer1)
Expand Down Expand Up @@ -225,7 +229,7 @@ func TestSendCancels(t *testing.T) {
peerQueueFactory := makePeerQueueFactory(msgs)
tp := random.Peers(3)
peer1, peer2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)
cids := random.Cids(4)

// Connect to peer1 and peer2
Expand Down Expand Up @@ -286,7 +290,7 @@ func TestSessionRegistration(t *testing.T) {

tp := random.Peers(3)
p1, p2 := tp[0], tp[1]
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

id := uint64(1)
s := newSess(id)
Expand Down Expand Up @@ -332,6 +336,7 @@ func (*benchPeerQueue) Shutdown() {}
func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {}
func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {}
func (*benchPeerQueue) AddCancels(cs []cid.Cid) {}
func (*benchPeerQueue) HasMessage() bool { return true }
func (*benchPeerQueue) ResponseReceived(ks []cid.Cid) {}

// Simplistic benchmark to allow us to stress test
Expand All @@ -345,7 +350,7 @@ func BenchmarkPeerManager(b *testing.B) {
}

peers := random.Peers(500)
peerManager := New(ctx, peerQueueFactory)
peerManager := New(ctx, peerQueueFactory, bcastAlways)

// Create a bunch of connections
connected := 0
Expand Down
Loading