From ac4c801845210a6f8f9e5f0415da0cb520c61d26 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 17 Oct 2024 10:36:31 -1000 Subject: [PATCH] fix: performance improvements - Fix exhausted wants problem resulting in possible performance issue - Minor improvements for GC. --- .../client/internal/session/sessionwants.go | 19 +++++++++++----- .../internal/session/sessionwantsender.go | 22 +++++++++---------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/bitswap/client/internal/session/sessionwants.go b/bitswap/client/internal/session/sessionwants.go index 0d4ded013..ce067e83f 100644 --- a/bitswap/client/internal/session/sessionwants.go +++ b/bitswap/client/internal/session/sessionwants.go @@ -56,9 +56,13 @@ func (sw *sessionWants) GetNextWants() []cid.Cid { // limit) currentLiveCount := len(sw.liveWants) toAdd := sw.broadcastLimit - currentLiveCount + liveSize := min(toAdd, sw.toFetch.Len()) + if liveSize == 0 { + return nil + } - var live []cid.Cid - for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- { + live := make([]cid.Cid, 0, liveSize) + for ; toAdd != 0 && sw.toFetch.Len() != 0; toAdd-- { c := sw.toFetch.Pop() live = append(live, c) sw.liveWantsOrder = append(sw.liveWantsOrder, c) @@ -117,6 +121,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) cleaned = append(cleaned, c) } } + clear(sw.liveWantsOrder[len(cleaned):]) // GC cleared items sw.liveWantsOrder = cleaned } @@ -127,7 +132,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration) // live want CIDs up to the broadcast limit. func (sw *sessionWants) PrepareBroadcast() []cid.Cid { now := time.Now() - live := make([]cid.Cid, 0, len(sw.liveWants)) + live := make([]cid.Cid, 0, min(len(sw.liveWants), sw.broadcastLimit)) for _, c := range sw.liveWantsOrder { if _, ok := sw.liveWants[c]; ok { // No response was received for the want, so reset the sent time @@ -153,9 +158,11 @@ func (sw *sessionWants) CancelPending(keys []cid.Cid) { // LiveWants returns a list of live wants func (sw *sessionWants) LiveWants() []cid.Cid { - live := make([]cid.Cid, 0, len(sw.liveWants)) + live := make([]cid.Cid, len(sw.liveWants)) + var i int for c := range sw.liveWants { - live = append(live, c) + live[i] = c + i++ } return live @@ -180,7 +187,7 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid { // Has live wants indicates if there are any live wants func (sw *sessionWants) HasLiveWants() bool { - return len(sw.liveWants) > 0 + return len(sw.liveWants) != 0 } // Indicates whether the want is in either of the fetch or live queues diff --git a/bitswap/client/internal/session/sessionwantsender.go b/bitswap/client/internal/session/sessionwantsender.go index 1beefeb94..9e50771cc 100644 --- a/bitswap/client/internal/session/sessionwantsender.go +++ b/bitswap/client/internal/session/sessionwantsender.go @@ -161,8 +161,7 @@ func (sws *sessionWantSender) Cancel(ks []cid.Cid) { // Update is called when the session receives a message with incoming blocks // or HAVE / DONT_HAVE func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) { - hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0 - if !hasUpdate { + if len(ks) == 0 && len(haves) == 0 && len(dontHaves) == 0 { return } @@ -270,7 +269,7 @@ func (sws *sessionWantSender) onChange(changes []change) { if chng.update.from != "" { // If the update includes blocks or haves, treat it as signaling that // the peer is available - if len(chng.update.ks) > 0 || len(chng.update.haves) > 0 { + if len(chng.update.ks) != 0 || len(chng.update.haves) != 0 { p := chng.update.from availability[p] = true @@ -296,7 +295,7 @@ func (sws *sessionWantSender) onChange(changes []change) { sws.checkForExhaustedWants(dontHaves, newlyUnavailable) // If there are any cancels, send them - if len(cancels) > 0 { + if len(cancels) != 0 { sws.canceller.CancelSessionWants(sws.sessionID, cancels) } @@ -349,8 +348,7 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) { } // Create the want info - wi := newWantInfo(sws.peerRspTrkr) - sws.wants[c] = wi + sws.wants[c] = newWantInfo(sws.peerRspTrkr) // For each available peer, register any information we know about // whether the peer has the block @@ -451,7 +449,7 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid { } } } - if len(prunePeers) > 0 { + if len(prunePeers) != 0 { go func() { for p := range prunePeers { // Peer doesn't have anything we want, so remove it @@ -479,11 +477,13 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU // If a peer just became unavailable, then we need to check all wants // (because it may be the last peer who hadn't sent a DONT_HAVE for a CID) - if len(newlyUnavailable) > 0 { + if len(newlyUnavailable) != 0 { // Collect all pending wants wants = make([]cid.Cid, len(sws.wants)) + var i int for c := range sws.wants { - wants = append(wants, c) + wants[i] = c + i++ } // If the last available peer in the session has become unavailable @@ -496,7 +496,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU // If all available peers for a cid sent a DONT_HAVE, signal to the session // that we've exhausted available peers - if len(wants) > 0 { + if len(wants) != 0 { exhausted := sws.bpm.AllPeersDoNotHaveBlock(sws.spm.Peers(), wants) sws.processExhaustedWants(exhausted) } @@ -506,7 +506,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU // already been marked as exhausted are passed to onPeersExhausted() func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) { newlyExhausted := sws.newlyExhausted(exhausted) - if len(newlyExhausted) > 0 { + if len(newlyExhausted) != 0 { sws.onPeersExhausted(newlyExhausted) } }