diff --git a/CHANGELOG.md b/CHANGELOG.md index c25117d01..7227a8832 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,23 @@ The following emojis are used to highlight certain changes: ### Added +- `bitswap/client`: New metrics: + - `ipfs_bitswap_wanthaves_broadcast`: Count of want-haves broadcasts + - `ipfs_bitswap_haves_received`: Count of total have responses + - `ipfs_bitswap_bcast_skips_total{`: Count of broadcasts skipped as part of spam reduction logic (see "Changed" below) + - `ipfs_bitswap_unique_blocks_received`: Count of non-duplicate blocks recieved + ### Changed +- `bitswap/client`: Added an opt-in ability to reduce bitswap broadcast volume by limiting broadcasts to peers that have previously responded as having wanted blocks and peers on local network. The following bitswap client options are available to configure the behavior of broadcast reduction: + - `BroadcastControlEnable` enables or disables broadcast reduction logic. Setting this to `false` restores the previous broadcast behavior of sending broadcasts to all peers, and ignores all other `BroadcastControl` options. Default is `false` (disabled). + - `BroadcastControlMaxPeers` sets a hard limit on the number of peers to send broadcasts to. A value of `0` means no broadcasts are sent. A value of `-1` means there is no limit. Default is `-1` (unlimited). + - `BroadcastControlLocalPeers` enables or disables broadcast control for peers on the local network. If `false`, then always broadcast to peers on the local network. If `true`, apply broadcast control to local peers. Default is `false` (always broadcast to local peers). + - `BroadcastControlPeeredPeers` enables or disables broadcast control for peers configured for peering. If `false`, then always broadcast to peers configured for peering. If `true`, apply broadcast control to peered peers. Default is `false` (always broadcast to peered peers). + - `BroadcastControlMaxRandomPeers` sets the number of peers to broadcast to anyway, even though broadcast control logic has determined that they are not broadcast targets. Setting this to a non-zero value ensures at least this number of random peers receives a broadcast. This may be helpful in cases where peers that are not receiving broadcasts may have wanted blocks. Default is `0` (no random broadcasts). + - `BroadcastControlSendToPendingPeers` enables or disables sending broadcasts to any peers to which there is a pending message to send. When `true` (enabled), this sends broadcasts to many more peers, but does so in a way that does not increase the number of separate broadcast messages. There is still the increased cost of the recipients having to process and respond to the broadcasts. Default is `false`. + + ### Removed - `bitswap/server` do not allow override of peer ledger with `WithPeerLedger` [#938](https://github.com/ipfs/boxo/pull/938) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 07ec463cb..e063696d9 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -137,6 +137,69 @@ func WithDefaultProviderQueryManager(defaultProviderQueryManager bool) Option { } } +// BroadcastControlEnable enables or disables broadcast reduction logic. +// Setting this to false restores the previous broadcast behavior of sending +// broadcasts to all peers, and ignores all other BroadcastControl options. +// Default is false (disabled). +func BroadcastControlEnable(enable bool) Option { + return func(bs *Client) { + bs.bcastControl.Enable = enable + } +} + +// BroadcastControlMaxPeers sets a hard limit on the number of peers to send +// broadcasts to. A value of 0 means no broadcasts are sent. A value of -1 +// means there is no limit. Default is -1 (unlimited). +func BroadcastControlMaxPeers(limit int) Option { + return func(bs *Client) { + bs.bcastControl.MaxPeers = limit + } +} + +// BroadcastControlLocalPeers enables or disables broadcast control for peers +// on the local network. If false, than always broadcast to peers on the local +// network. If true, apply broadcast control to local peers. Default is false +// (always broadcast to local peers). +func BroadcastControlLocalPeers(enable bool) Option { + return func(bs *Client) { + bs.bcastControl.LocalPeers = enable + } +} + +// BroadcastControlPeeredPeers enables or disables broadcast control for peers +// configured for peering. If false, than always broadcast to peers configured +// for peering. If true, apply broadcast control to peered peers. Default is +// false (always broadcast to peered peers). +func BroadcastControlPeeredPeers(enable bool) Option { + return func(bs *Client) { + bs.bcastControl.PeeredPeers = enable + } +} + +// BroadcastControlMaxRandomPeers sets the number of peers to broadcast to +// anyway, even though broadcast control logic has determined that they are +// not broadcast targets. Setting this to a non-zero value ensures at least +// this number of random peers receives a broadcast. This may be helpful in +// cases where peers that are not receiving broadcasts may have wanted blocks. +// Default is 0 (no random broadcasts). +func BroadcastControlMaxRandomPeers(n int) Option { + return func(bs *Client) { + bs.bcastControl.MaxRandomPeers = n + } +} + +// BroadcastControlSendToPendingPeers, enables or disables sending broadcasts +// to any peers to which there is a pending message to send. When enabled, this +// sends broadcasts to many more peers, but does so in a way that does not +// increase the number of separate broadcast messages. There is still the +// increased cost of the recipients having to process and respond to the +// broadcasts. Default is false. +func BroadcastControlSendToPendingPeers(enable bool) Option { + return func(bs *Client) { + bs.bcastControl.SendToPendingPeers = enable + } +} + type BlockReceivedNotifier interface { // ReceivedBlocks notifies the decision engine that a peer is well-behaving // and gave us useful data, potentially increasing its score and making us @@ -166,10 +229,16 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro counters: new(counters), dupMetric: bmetrics.DupHist(ctx), allMetric: bmetrics.AllHist(ctx), + havesReceivedGauge: bmetrics.HavesReceivedGauge(ctx), + blocksReceivedGauge: bmetrics.BlocksReceivedGauge(ctx), provSearchDelay: defaults.ProvSearchDelay, rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay), simulateDontHavesOnTimeout: true, defaultProviderQueryManager: true, + + bcastControl: bspm.BroadcastControl{ + MaxPeers: -1, + }, } // apply functional options before starting and running bitswap @@ -177,6 +246,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro option(bs) } + if bs.bcastControl.Enable { + if bs.bcastControl.NeedHost() { + bs.bcastControl.Host = network.Host() + } + bs.bcastControl.SkipGauge = bmetrics.BroadcastSkipGauge(ctx) + } + // onDontHaveTimeout is called when a want-block is sent to a peer that // has an old version of Bitswap that doesn't support DONT_HAVE messages, // or when no response is received within a timeout. @@ -201,7 +277,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro sim := bssim.New() bpm := bsbpm.New() - pm := bspm.New(ctx, peerQueueFactory) + pm := bspm.New(ctx, peerQueueFactory, bs.bcastControl) if bs.providerFinder != nil && bs.defaultProviderQueryManager { // network can do dialing. @@ -237,7 +313,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro } else if providerFinder != nil { sessionProvFinder = providerFinder } - return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self) + return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self, bs.havesReceivedGauge) } sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager { return bsspm.New(id, network) @@ -285,6 +361,9 @@ type Client struct { dupMetric metrics.Histogram allMetric metrics.Histogram + havesReceivedGauge bspm.Gauge + blocksReceivedGauge bspm.Gauge + // External statistics interface tracer tracer.Tracer @@ -311,6 +390,9 @@ type Client struct { skipDuplicatedBlocksStats bool perPeerSendDelay time.Duration + + // Broadcast control configuration. + bcastControl bspm.BroadcastControl } type counters struct { @@ -384,6 +466,10 @@ func (bs *Client) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []bl default: } + if len(blks) != 0 || len(haves) != 0 { + bs.pm.MarkBroadcastTarget(from) + } + wanted, notWanted := bs.sim.SplitWantedUnwanted(blks) if log.Level().Enabled(zapcore.DebugLevel) { for _, b := range notWanted { @@ -484,6 +570,7 @@ func (bs *Client) updateReceiveCounters(blocks []blocks.Block) { c.dupBlocksRecvd++ c.dupDataRecvd += uint64(blkLen) } + bs.blocksReceivedGauge.Inc() } } diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index e9ff75949..4249149d5 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -107,6 +107,8 @@ type MessageQueue struct { events chan<- messageEvent perPeerDelay time.Duration + + BcastInc func() } // recallWantlist keeps a list of pending wants and a list of sent wants @@ -424,6 +426,13 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { } } +func (mq *MessageQueue) HasMessage() bool { + mq.wllock.Lock() + defer mq.wllock.Unlock() + + return mq.bcstWants.pending.Len() != 0 || mq.peerWants.pending.Len() != 0 || mq.cancels.Len() != 0 +} + // ResponseReceived is called when a message is received from the network. // ks is the set of blocks, HAVEs and DONT_HAVEs in the message // Note that this is just used to calculate latency. @@ -862,6 +871,9 @@ FINISH: for _, e := range bcstEntries[:sentBcstEntries] { if e.Cid.Defined() { // Check if want was canceled in the interim mq.bcstWants.setSentAt(e.Cid, now) + if mq.BcastInc != nil { + mq.BcastInc() + } } } diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index 28e822652..81021f86c 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/ipfs/boxo/bitswap/client/internal/messagequeue" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" @@ -18,6 +19,7 @@ type PeerQueue interface { AddWants([]cid.Cid, []cid.Cid) AddCancels([]cid.Cid) ResponseReceived(ks []cid.Cid) + HasMessage() bool Startup() Shutdown() } @@ -44,20 +46,25 @@ type PeerManager struct { psLk sync.RWMutex sessions map[uint64]Session peerSessions map[peer.ID]map[uint64]struct{} + + bcastGauge Gauge } // New creates a new PeerManager, given a context and a peerQueueFactory. -func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { +func New(ctx context.Context, createPeerQueue PeerQueueFactory, bcastControl BroadcastControl) *PeerManager { wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge() + return &PeerManager{ peerQueues: make(map[peer.ID]PeerQueue), - pwm: newPeerWantManager(wantGauge, wantBlockGauge), + pwm: newPeerWantManager(wantGauge, wantBlockGauge, bcastControl), createPeerQueue: createPeerQueue, ctx: ctx, sessions: make(map[uint64]Session), peerSessions: make(map[peer.ID]map[uint64]struct{}), + + bcastGauge: metrics.NewCtx(ctx, "wanthaves_broadcast", "Number of want-haves broadcast.").Gauge(), } } @@ -189,6 +196,10 @@ func (pm *PeerManager) getOrCreate(p peer.ID) PeerQueue { pq, ok := pm.peerQueues[p] if !ok { pq = pm.createPeerQueue(pm.ctx, p) + mq, ok := pq.(*messagequeue.MessageQueue) + if ok { + mq.BcastInc = pm.bcastGauge.Inc + } pq.Startup() pm.peerQueues[p] = pq } @@ -227,6 +238,10 @@ func (pm *PeerManager) UnregisterSession(ses uint64) { delete(pm.sessions, ses) } +func (pm *PeerManager) MarkBroadcastTarget(from peer.ID) { + pm.pwm.markBroadcastTarget(from) +} + // signalAvailability is called when a peer's connectivity changes. // It informs interested sessions. func (pm *PeerManager) signalAvailability(p peer.ID, isConnected bool) { diff --git a/bitswap/client/internal/peermanager/peermanager_test.go b/bitswap/client/internal/peermanager/peermanager_test.go index b45cd4c33..e85a3ad64 100644 --- a/bitswap/client/internal/peermanager/peermanager_test.go +++ b/bitswap/client/internal/peermanager/peermanager_test.go @@ -39,6 +39,10 @@ func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) { fp.msgs <- msg{fp.p, nil, nil, cs} } +func (fp *mockPeerQueue) HasMessage() bool { + return true +} + func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) { } @@ -86,7 +90,7 @@ func TestAddingAndRemovingPeers(t *testing.T) { tp := random.Peers(6) peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] - peerManager := New(ctx, peerQueueFactory) + peerManager := New(ctx, peerQueueFactory, bcastAlways) peerManager.Connected(peer1) peerManager.Connected(peer2) @@ -129,7 +133,7 @@ func TestBroadcastOnConnect(t *testing.T) { peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(2) peer1 := tp[0] - peerManager := New(ctx, peerQueueFactory) + peerManager := New(ctx, peerQueueFactory, bcastAlways) cids := random.Cids(2) peerManager.BroadcastWantHaves(ctx, cids) @@ -150,7 +154,7 @@ func TestBroadcastWantHaves(t *testing.T) { peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(3) peer1, peer2 := tp[0], tp[1] - peerManager := New(ctx, peerQueueFactory) + peerManager := New(ctx, peerQueueFactory, bcastAlways) cids := random.Cids(3) @@ -191,7 +195,7 @@ func TestSendWants(t *testing.T) { peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(2) peer1 := tp[0] - peerManager := New(ctx, peerQueueFactory) + peerManager := New(ctx, peerQueueFactory, bcastAlways) cids := random.Cids(4) peerManager.Connected(peer1) @@ -225,7 +229,7 @@ func TestSendCancels(t *testing.T) { peerQueueFactory := makePeerQueueFactory(msgs) tp := random.Peers(3) peer1, peer2 := tp[0], tp[1] - peerManager := New(ctx, peerQueueFactory) + peerManager := New(ctx, peerQueueFactory, bcastAlways) cids := random.Cids(4) // Connect to peer1 and peer2 @@ -286,7 +290,7 @@ func TestSessionRegistration(t *testing.T) { tp := random.Peers(3) p1, p2 := tp[0], tp[1] - peerManager := New(ctx, peerQueueFactory) + peerManager := New(ctx, peerQueueFactory, bcastAlways) id := uint64(1) s := newSess(id) @@ -332,6 +336,7 @@ func (*benchPeerQueue) Shutdown() {} func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {} func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {} func (*benchPeerQueue) AddCancels(cs []cid.Cid) {} +func (*benchPeerQueue) HasMessage() bool { return true } func (*benchPeerQueue) ResponseReceived(ks []cid.Cid) {} // Simplistic benchmark to allow us to stress test @@ -345,7 +350,7 @@ func BenchmarkPeerManager(b *testing.B) { } peers := random.Peers(500) - peerManager := New(ctx, peerQueueFactory) + peerManager := New(ctx, peerQueueFactory, bcastAlways) // Create a bunch of connections connected := 0 diff --git a/bitswap/client/internal/peermanager/peerwantmanager.go b/bitswap/client/internal/peermanager/peerwantmanager.go index 59d9e16fc..6fa0a262d 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager.go +++ b/bitswap/client/internal/peermanager/peerwantmanager.go @@ -3,9 +3,13 @@ package peermanager import ( "fmt" "strings" + "sync" + "github.com/ipfs/boxo/peering" cid "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/host" peer "github.com/libp2p/go-libp2p/core/peer" + manet "github.com/multiformats/go-multiaddr/net" ) // Gauge can be used to keep track of a metric that increases and decreases @@ -16,6 +20,45 @@ type Gauge interface { Dec() } +// BroadcastControl configures broadcast control functionality. +type BroadcastControl struct { + // Enable enables or disables broadcast control. + Enable bool + // Host is the libp2p host used to get peer information. + Host host.Host + // MaxPeers is the hard limit on the number of peers to send broadcasts + // to. A value of 0 means no broadcasts are sent. A value of -1 means there + // is no limit. + MaxPeers int + // LocalPeers enables or disables broadcast control for peers on the local + // network. If false, than always broadcast to peers on the local network. + // If true, apply broadcast reduction to local peers. + LocalPeers bool + // PeeredPeers enables or disables broadcast reduction for peers configured + // for peering. If false, than always broadcast to peers configured for + // peering. If true, apply broadcast reduction to peered peers. + // false (always broadcast to peered peers). + PeeredPeers bool + // MaxRandomPeers is the number of peers to broadcast to anyway, even + // though broadcast reduction logic has determined that they are not + // broadcast targets. Setting this to a non-zero value ensures at least + // this number of random peers receives a broadcast. This may be helpful in + // cases where peers that are not receiving broadcasts may have wanted + // blocks. + MaxRandomPeers int + // SendToPendingPeers, when true, sends broadcasts to any peers that already + // have a pending message to send. + SendToPendingPeers bool + // SkipGauge overrides the Gauge that tracks the number of broadcasts + // skipped by broadcast reduction logic. + SkipGauge Gauge +} + +// NeedHost returns true if the Host is required to support the configuration. +func (bc BroadcastControl) NeedHost() bool { + return bc.MaxPeers != 0 && !bc.LocalPeers && !bc.PeeredPeers +} + // peerWantManager keeps track of which want-haves and want-blocks have been // sent to each peer, so that the PeerManager doesn't send duplicates. type peerWantManager struct { @@ -34,6 +77,11 @@ type peerWantManager struct { wantGauge Gauge // Keeps track of the number of active want-blocks wantBlockGauge Gauge + + bcastControl BroadcastControl + bcastMutex sync.Mutex + bcastTargets map[peer.ID]struct{} + remotePeers map[peer.ID]struct{} } type peerWant struct { @@ -44,14 +92,26 @@ type peerWant struct { // New creates a new peerWantManager with a Gauge that keeps track of the // number of active want-blocks (ie sent but no response received) -func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager { - return &peerWantManager{ +func newPeerWantManager(wantGauge, wantBlockGauge Gauge, bcastControl BroadcastControl) *peerWantManager { + pwm := &peerWantManager{ broadcastWants: cid.NewSet(), peerWants: make(map[peer.ID]*peerWant), wantPeers: make(map[cid.Cid]map[peer.ID]struct{}), wantGauge: wantGauge, wantBlockGauge: wantBlockGauge, } + + if bcastControl.Enable { + if bcastControl.Host == nil && bcastControl.NeedHost() { + panic("Host missing from BroadcastControl") + } + + pwm.bcastControl = bcastControl + pwm.bcastTargets = make(map[peer.ID]struct{}) + pwm.remotePeers = make(map[peer.ID]struct{}) + } + + return pwm } // addPeer adds a peer whose wants we need to keep track of. It sends the @@ -112,10 +172,32 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { }) delete(pwm.peerWants, p) + delete(pwm.remotePeers, p) + + if pwm.bcastTargets != nil { + pwm.bcastMutex.Lock() + delete(pwm.bcastTargets, p) + pwm.bcastMutex.Unlock() + } } // broadcastWantHaves sends want-haves to any peers that have not yet been sent them. func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) { + var reduce bool + var maxPeers int + var randosToSend int + + // If broadcast reduction logic enabled. + if pwm.bcastControl.Enable { + if pwm.bcastControl.MaxPeers == 0 { + // broadcasts are completely disabled + return + } + reduce = true + maxPeers = pwm.bcastControl.MaxPeers + randosToSend = pwm.bcastControl.MaxRandomPeers + } + unsent := make([]cid.Cid, 0, len(wantHaves)) for _, c := range wantHaves { if pwm.broadcastWants.Has(c) { @@ -140,7 +222,20 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) { bcstWantsBuffer := make([]cid.Cid, 0, len(unsent)) // Send broadcast wants to each peer - for _, pws := range pwm.peerWants { + for p, pws := range pwm.peerWants { + var sentRando bool + if reduce && pwm.skipBroadcast(p, pws.peerQueue) { + if randosToSend == 0 { + pwm.bcastControl.SkipGauge.Inc() + continue + } + // Send this random peer, that is not a broadcast target, a + // broadcast. + // + // The source of randomness is changes to the peerWants map. + sentRando = true + } + peerUnsent := bcstWantsBuffer[:0] for _, c := range unsent { // If we've already sent a want to this peer, skip them. @@ -149,10 +244,91 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) { } } - if len(peerUnsent) > 0 { - pws.peerQueue.AddBroadcastWantHaves(peerUnsent) + if len(peerUnsent) == 0 { + continue + } + + pws.peerQueue.AddBroadcastWantHaves(peerUnsent) + + if sentRando { + randosToSend-- + } + + if maxPeers > 0 { + maxPeers-- + if maxPeers == 0 { + break + } + } + } +} + +func (pwm *peerWantManager) skipBroadcast(peerID peer.ID, peerQueue PeerQueue) bool { + // Broadcast to peer from which block(s) have been previously received. + pwm.bcastMutex.Lock() + _, ok := pwm.bcastTargets[peerID] + pwm.bcastMutex.Unlock() + if ok { + return false + } + + // Broadcast to peers on local network if they are not subject to broadcast reduction. + if !pwm.bcastControl.LocalPeers && pwm.isLocalPeer(peerID) { + // Add local peer to broadcast targets to avoid next isLocalPeer check. + pwm.bcastMutex.Lock() + pwm.bcastTargets[peerID] = struct{}{} + pwm.bcastMutex.Unlock() + return false + } + + // Broadcast to peers that are configured for peering if they are not subject to broadcast reduction. + if !pwm.bcastControl.PeeredPeers { + connMgr := pwm.bcastControl.Host.ConnManager() + if connMgr != nil && connMgr.IsProtected(peerID, peering.ConnmgrTag) { + // Add peered peer to broadcast targets to avoid future connection tag lookup. + pwm.bcastMutex.Lock() + pwm.bcastTargets[peerID] = struct{}{} + pwm.bcastMutex.Unlock() + return false + } + } + + // Broadcast to peers that have a pending message to piggyback on. + if pwm.bcastControl.SendToPendingPeers && peerQueue.HasMessage() { + return false + } + return true +} + +func (pwm *peerWantManager) markBroadcastTarget(peerID peer.ID) { + if pwm.bcastTargets == nil { + return + } + pwm.bcastMutex.Lock() + pwm.bcastTargets[peerID] = struct{}{} + pwm.bcastMutex.Unlock() +} + +func (pwm *peerWantManager) isLocalPeer(peerID peer.ID) bool { + if _, ok := pwm.remotePeers[peerID]; ok { + return false + } + peerStore := pwm.bcastControl.Host.Peerstore() + if peerStore == nil { + return false + } + addrs := peerStore.Addrs(peerID) + // Assume local if peer has no addresses. + if len(addrs) == 0 { + return true + } + for _, addr := range addrs { + if manet.IsPrivateAddr(addr) || manet.IsIPLoopback(addr) { + return true } } + pwm.remotePeers[peerID] = struct{}{} + return false } // sendWants only sends the peer the want-blocks and want-haves that have not diff --git a/bitswap/client/internal/peermanager/peerwantmanager_test.go b/bitswap/client/internal/peermanager/peerwantmanager_test.go index bfe0c626d..d369520d0 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager_test.go +++ b/bitswap/client/internal/peermanager/peerwantmanager_test.go @@ -9,6 +9,10 @@ import ( "github.com/stretchr/testify/require" ) +var bcastAlways = BroadcastControl{ + Enable: false, +} + type gauge struct { count int } @@ -51,6 +55,10 @@ func (mpq *mockPQ) AddCancels(cs []cid.Cid) { mpq.cancels = append(mpq.cancels, cs...) } +func (mpq *mockPQ) HasMessage() bool { + return len(mpq.wbs) != 0 || len(mpq.whs) != 0 || len(mpq.cancels) != 0 +} + func (mpq *mockPQ) ResponseReceived(ks []cid.Cid) { } @@ -61,13 +69,13 @@ func clearSent(pqs map[peer.ID]PeerQueue) { } func TestEmpty(t *testing.T) { - pwm := newPeerWantManager(&gauge{}, &gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}, bcastAlways) require.Empty(t, pwm.getWantBlocks()) require.Empty(t, pwm.getWantHaves()) } func TestPWMBroadcastWantHaves(t *testing.T) { - pwm := newPeerWantManager(&gauge{}, &gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}, bcastAlways) peers := random.Peers(3) cids := random.Cids(2) @@ -153,7 +161,7 @@ func TestPWMBroadcastWantHaves(t *testing.T) { } func TestPWMSendWants(t *testing.T) { - pwm := newPeerWantManager(&gauge{}, &gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}, bcastAlways) peers := random.Peers(2) p0 := peers[0] @@ -213,7 +221,7 @@ func TestPWMSendWants(t *testing.T) { } func TestPWMSendCancels(t *testing.T) { - pwm := newPeerWantManager(&gauge{}, &gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}, bcastAlways) peers := random.Peers(2) p0 := peers[0] @@ -271,7 +279,7 @@ func TestPWMSendCancels(t *testing.T) { func TestStats(t *testing.T) { g := &gauge{} wbg := &gauge{} - pwm := newPeerWantManager(g, wbg) + pwm := newPeerWantManager(g, wbg, bcastAlways) peers := random.Peers(2) p0 := peers[0] @@ -340,7 +348,7 @@ func TestStats(t *testing.T) { func TestStatsOverlappingWantBlockWantHave(t *testing.T) { g := &gauge{} wbg := &gauge{} - pwm := newPeerWantManager(g, wbg) + pwm := newPeerWantManager(g, wbg, bcastAlways) peers := random.Peers(2) p0 := peers[0] @@ -371,7 +379,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) { func TestStatsRemovePeerOverlappingWantBlockWantHave(t *testing.T) { g := &gauge{} wbg := &gauge{} - pwm := newPeerWantManager(g, wbg) + pwm := newPeerWantManager(g, wbg, bcastAlways) peers := random.Peers(2) p0 := peers[0] diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index 149f4b9dd..59bd99d63 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -142,6 +142,7 @@ func New( initialSearchDelay time.Duration, periodicSearchDelay delay.D, self peer.ID, + havesReceivedGauge bspm.Gauge, ) *Session { ctx, cancel := context.WithCancel(ctx) s := &Session{ @@ -163,7 +164,7 @@ func New( periodicSearchDelay: periodicSearchDelay, self: self, } - s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted) + s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted, havesReceivedGauge) go s.run(ctx) diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index bc6111907..8a52f41d3 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -169,7 +169,7 @@ func TestSessionGetBlocks(t *testing.T) { defer notif.Shutdown() id := random.SequenceNext() sm := newMockSessionMgr(sim) - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "", nil) blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize) var cids []cid.Cid for _, block := range blks { @@ -249,7 +249,7 @@ func TestSessionFindMorePeers(t *testing.T) { defer notif.Shutdown() id := random.SequenceNext() sm := newMockSessionMgr(sim) - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "", nil) session.SetBaseTickDelay(200 * time.Microsecond) blks := random.BlocksOfSize(broadcastLiveWantsLimit*2, blockSize) var cids []cid.Cid @@ -319,7 +319,7 @@ func TestSessionOnPeersExhausted(t *testing.T) { defer notif.Shutdown() id := random.SequenceNext() sm := newMockSessionMgr(sim) - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "", nil) blks := random.BlocksOfSize(broadcastLiveWantsLimit+5, blockSize) var cids []cid.Cid for _, block := range blks { @@ -356,7 +356,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) { defer notif.Shutdown() id := random.SequenceNext() sm := newMockSessionMgr(sim) - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "") + session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, 10*time.Millisecond, delay.Fixed(100*time.Millisecond), "", nil) blks := random.BlocksOfSize(4, blockSize) var cids []cid.Cid for _, block := range blks { @@ -457,7 +457,7 @@ func TestSessionCtxCancelClosesGetBlocksChannel(t *testing.T) { // Create a new session with its own context sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "", nil) timerCtx, timerCancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer timerCancel() @@ -501,7 +501,7 @@ func TestSessionOnShutdownCalled(t *testing.T) { // Create a new session with its own context sessctx, sesscancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer sesscancel() - session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(sessctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "", nil) // Shutdown the session session.Shutdown() @@ -524,7 +524,7 @@ func TestSessionReceiveMessageAfterCtxCancel(t *testing.T) { defer notif.Shutdown() id := random.SequenceNext() sm := newMockSessionMgr(sim) - session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "") + session := New(ctx, sm, id, fspm, fpf, sim, fpm, bpm, notif, time.Second, delay.Fixed(time.Minute), "", nil) blks := random.BlocksOfSize(2, blockSize) cids := []cid.Cid{blks[0].Cid(), blks[1].Cid()} diff --git a/bitswap/client/internal/session/sessionwantsender.go b/bitswap/client/internal/session/sessionwantsender.go index bc3e6c8db..6266184c7 100644 --- a/bitswap/client/internal/session/sessionwantsender.go +++ b/bitswap/client/internal/session/sessionwantsender.go @@ -4,6 +4,7 @@ import ( "context" bsbpm "github.com/ipfs/boxo/bitswap/client/internal/blockpresencemanager" + bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager" cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p/core/peer" ) @@ -107,10 +108,12 @@ type sessionWantSender struct { onSend onSendFn // Called when all peers explicitly don't have a block onPeersExhausted onPeersExhaustedFn + // Tracks number of haves received. + havesReceivedGauge bspm.Gauge } func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager, canceller SessionWantsCanceller, - bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn, + bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn, havesReceivedGauge bspm.Gauge, ) sessionWantSender { ctx, cancel := context.WithCancel(context.Background()) sws := sessionWantSender{ @@ -129,6 +132,8 @@ func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager, ca bpm: bpm, onSend: onSend, onPeersExhausted: onPeersExhausted, + + havesReceivedGauge: havesReceivedGauge, } return sws @@ -634,6 +639,9 @@ func (sws *sessionWantSender) updateWantBlockPresence(c cid.Cid, p peer.ID) { switch { case sws.bpm.PeerHasBlock(p, c): wi.setPeerBlockPresence(p, BPHave) + if sws.havesReceivedGauge != nil { + sws.havesReceivedGauge.Inc() + } case sws.bpm.PeerDoesNotHaveBlock(p, c): wi.setPeerBlockPresence(p, BPDontHave) default: diff --git a/bitswap/client/internal/session/sessionwantsender_test.go b/bitswap/client/internal/session/sessionwantsender_test.go index f7127f2cb..2a786d392 100644 --- a/bitswap/client/internal/session/sessionwantsender_test.go +++ b/bitswap/client/internal/session/sessionwantsender_test.go @@ -153,7 +153,7 @@ func TestSendWants(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -188,7 +188,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -239,7 +239,7 @@ func TestReceiveBlock(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -293,7 +293,7 @@ func TestCancelWants(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -329,7 +329,7 @@ func TestRegisterSessionWithPeerManager(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -368,7 +368,7 @@ func TestProtectConnFirstPeerToSendWantedBlock(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -423,7 +423,7 @@ func TestPeerUnavailable(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -484,7 +484,7 @@ func TestPeersExhausted(t *testing.T) { onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} ep := exhaustedPeers{} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, ep.onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, ep.onPeersExhausted, nil) go spm.Run() @@ -558,7 +558,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) { onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} ep := exhaustedPeers{} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, ep.onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, ep.onPeersExhausted, nil) go spm.Run() @@ -606,7 +606,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) { onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} ep := exhaustedPeers{} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, ep.onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, ep.onPeersExhausted, nil) go spm.Run() @@ -644,7 +644,7 @@ func TestConsecutiveDontHaveLimit(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -697,7 +697,7 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -751,7 +751,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() @@ -828,7 +828,7 @@ func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) { bpm := bsbpm.New() onSend := func(peer.ID, []cid.Cid, []cid.Cid) {} onPeersExhausted := func([]cid.Cid) {} - spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted) + spm := newSessionWantSender(sid, pm, fpm, swc, bpm, onSend, onPeersExhausted, nil) defer spm.Shutdown() go spm.Run() diff --git a/bitswap/metrics/metrics.go b/bitswap/metrics/metrics.go index e40276842..ae7174c6a 100644 --- a/bitswap/metrics/metrics.go +++ b/bitswap/metrics/metrics.go @@ -44,3 +44,15 @@ func PendingBlocksGauge(ctx context.Context) metrics.Gauge { func ActiveBlocksGauge(ctx context.Context) metrics.Gauge { return metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge() } + +func HavesReceivedGauge(ctx context.Context) metrics.Gauge { + return metrics.NewCtx(ctx, "haves_received", "Number of have blocks received.").Gauge() +} + +func BlocksReceivedGauge(ctx context.Context) metrics.Gauge { + return metrics.NewCtx(ctx, "blocks_received", "Number of blocks received.").Gauge() +} + +func BroadcastSkipGauge(ctx context.Context) metrics.Gauge { + return metrics.NewCtx(ctx, "bcast_skips_total", "Number of broadcasts to peers avoided.").Gauge() +} diff --git a/bitswap/network/bsnet/ipfs_impl.go b/bitswap/network/bsnet/ipfs_impl.go index 3c3a6a1f5..f9905755a 100644 --- a/bitswap/network/bsnet/ipfs_impl.go +++ b/bitswap/network/bsnet/ipfs_impl.go @@ -11,7 +11,6 @@ import ( bsmsg "github.com/ipfs/boxo/bitswap/message" iface "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/network/bsnet/internal" - logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -274,6 +273,10 @@ func (bsnet *impl) Latency(p peer.ID) time.Duration { return bsnet.host.Peerstore().LatencyEWMA(p) } +func (bsnet *impl) Host() host.Host { + return bsnet.host +} + // Indicates whether the given protocol supports HAVE / DONT_HAVE messages func (bsnet *impl) SupportsHave(proto protocol.ID) bool { switch proto { diff --git a/bitswap/network/httpnet/httpnet.go b/bitswap/network/httpnet/httpnet.go index cc7a53607..b1bbe67d4 100644 --- a/bitswap/network/httpnet/httpnet.go +++ b/bitswap/network/httpnet/httpnet.go @@ -357,6 +357,10 @@ func (ht *Network) Latency(p peer.ID) time.Duration { return ht.pinger.latency(p) } +func (ht *Network) Host() host.Host { + return ht.host +} + func (ht *Network) senderURLs(p peer.ID) []*senderURL { pi := ht.host.Peerstore().PeerInfo(p) urls := network.ExtractURLsFromPeer(pi) diff --git a/bitswap/network/interface.go b/bitswap/network/interface.go index 89ec990f5..1f0e24f4a 100644 --- a/bitswap/network/interface.go +++ b/bitswap/network/interface.go @@ -5,9 +5,8 @@ import ( "time" bsmsg "github.com/ipfs/boxo/bitswap/message" - cid "github.com/ipfs/go-cid" - + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -31,6 +30,8 @@ type BitSwapNetwork interface { NewMessageSender(context.Context, peer.ID, *MessageSenderOpts) (MessageSender, error) + Host() host.Host + Stats() Stats Self() peer.ID diff --git a/bitswap/network/router.go b/bitswap/network/router.go index 529e86749..25ac1bac9 100644 --- a/bitswap/network/router.go +++ b/bitswap/network/router.go @@ -5,6 +5,7 @@ import ( "time" bsmsg "github.com/ipfs/boxo/bitswap/message" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -83,6 +84,13 @@ func (rt *router) Latency(p peer.ID) time.Duration { return rt.Bitswap.Latency(p) } +func (rt *router) Host() host.Host { + if rt.Bitswap == nil { + return rt.HTTP.Host() + } + return rt.Bitswap.Host() +} + func (rt *router) SendMessage(ctx context.Context, p peer.ID, msg bsmsg.BitSwapMessage) error { // SendMessage is only used by bitswap server on sendBlocks(). We // should not be passing a router to the bitswap server but we try to diff --git a/bitswap/testnet/virtual.go b/bitswap/testnet/virtual.go index 556529acf..2e80ddf5e 100644 --- a/bitswap/testnet/virtual.go +++ b/bitswap/testnet/virtual.go @@ -15,6 +15,7 @@ import ( delay "github.com/ipfs/go-ipfs-delay" tnet "github.com/libp2p/go-libp2p-testing/net" "github.com/libp2p/go-libp2p/core/connmgr" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" protocol "github.com/libp2p/go-libp2p/core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -227,6 +228,10 @@ func (nc *networkClient) Latency(p peer.ID) time.Duration { return nc.network.latencies[nc.local][p] } +func (nc *networkClient) Host() host.Host { + return nil +} + func (nc *networkClient) SendMessage( ctx context.Context, to peer.ID, diff --git a/peering/peering.go b/peering/peering.go index 225bcff76..43737e7a8 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -22,7 +22,7 @@ const ( // If we go over the max, we'll adjust the delay down to a random value // between 90-100% of the max backoff. maxBackoffJitter = 10 // % - connmgrTag = "ipfs-peering" + ConnmgrTag = "ipfs-peering" // This needs to be sufficient to prevent two sides from simultaneously // dialing. initialDelay = 5 * time.Second @@ -236,7 +236,7 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { handler.setAddrs(info.Addrs) } else { logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) - ps.host.ConnManager().Protect(info.ID, connmgrTag) + ps.host.ConnManager().Protect(info.ID, ConnmgrTag) handler = &peerHandler{ host: ps.host, @@ -281,7 +281,7 @@ func (ps *PeeringService) RemovePeer(id peer.ID) { if handler, ok := ps.peers[id]; ok { logger.Infow("peer removed", "peer", id) - ps.host.ConnManager().Unprotect(id, connmgrTag) + ps.host.ConnManager().Unprotect(id, ConnmgrTag) handler.stop() delete(ps.peers, id)