Skip to content

Commit

Permalink
fix: performance improvements
Browse files Browse the repository at this point in the history
- Fix exhausted wants problem resulting in possible performance issue
- Minor improvements for GC.
  • Loading branch information
gammazero committed Oct 17, 2024
1 parent c2487a2 commit ac4c801
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
19 changes: 13 additions & 6 deletions bitswap/client/internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := sw.broadcastLimit - currentLiveCount
liveSize := min(toAdd, sw.toFetch.Len())
if liveSize == 0 {
return nil
}

Check warning on line 62 in bitswap/client/internal/session/sessionwants.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/session/sessionwants.go#L61-L62

Added lines #L61 - L62 were not covered by tests

var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
live := make([]cid.Cid, 0, liveSize)
for ; toAdd != 0 && sw.toFetch.Len() != 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
Expand Down Expand Up @@ -117,6 +121,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
cleaned = append(cleaned, c)
}
}
clear(sw.liveWantsOrder[len(cleaned):]) // GC cleared items
sw.liveWantsOrder = cleaned
}

Expand All @@ -127,7 +132,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants))
live := make([]cid.Cid, 0, min(len(sw.liveWants), sw.broadcastLimit))
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
Expand All @@ -153,9 +158,11 @@ func (sw *sessionWants) CancelPending(keys []cid.Cid) {

// LiveWants returns a list of live wants
func (sw *sessionWants) LiveWants() []cid.Cid {
live := make([]cid.Cid, 0, len(sw.liveWants))
live := make([]cid.Cid, len(sw.liveWants))
var i int
for c := range sw.liveWants {
live = append(live, c)
live[i] = c
i++
}

return live
Expand All @@ -180,7 +187,7 @@ func (sw *sessionWants) RandomLiveWant() cid.Cid {

// Has live wants indicates if there are any live wants
func (sw *sessionWants) HasLiveWants() bool {
return len(sw.liveWants) > 0
return len(sw.liveWants) != 0
}

// Indicates whether the want is in either of the fetch or live queues
Expand Down
22 changes: 11 additions & 11 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ func (sws *sessionWantSender) Cancel(ks []cid.Cid) {
// Update is called when the session receives a message with incoming blocks
// or HAVE / DONT_HAVE
func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0
if !hasUpdate {
if len(ks) == 0 && len(haves) == 0 && len(dontHaves) == 0 {
return
}

Expand Down Expand Up @@ -270,7 +269,7 @@ func (sws *sessionWantSender) onChange(changes []change) {
if chng.update.from != "" {
// If the update includes blocks or haves, treat it as signaling that
// the peer is available
if len(chng.update.ks) > 0 || len(chng.update.haves) > 0 {
if len(chng.update.ks) != 0 || len(chng.update.haves) != 0 {
p := chng.update.from
availability[p] = true

Expand All @@ -296,7 +295,7 @@ func (sws *sessionWantSender) onChange(changes []change) {
sws.checkForExhaustedWants(dontHaves, newlyUnavailable)

// If there are any cancels, send them
if len(cancels) > 0 {
if len(cancels) != 0 {
sws.canceller.CancelSessionWants(sws.sessionID, cancels)
}

Expand Down Expand Up @@ -349,8 +348,7 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) {
}

// Create the want info
wi := newWantInfo(sws.peerRspTrkr)
sws.wants[c] = wi
sws.wants[c] = newWantInfo(sws.peerRspTrkr)

// For each available peer, register any information we know about
// whether the peer has the block
Expand Down Expand Up @@ -451,7 +449,7 @@ func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
}
}
}
if len(prunePeers) > 0 {
if len(prunePeers) != 0 {
go func() {
for p := range prunePeers {
// Peer doesn't have anything we want, so remove it
Expand Down Expand Up @@ -479,11 +477,13 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU

// If a peer just became unavailable, then we need to check all wants
// (because it may be the last peer who hadn't sent a DONT_HAVE for a CID)
if len(newlyUnavailable) > 0 {
if len(newlyUnavailable) != 0 {
// Collect all pending wants
wants = make([]cid.Cid, len(sws.wants))
var i int
for c := range sws.wants {
wants = append(wants, c)
wants[i] = c
i++
}

// If the last available peer in the session has become unavailable
Expand All @@ -496,7 +496,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU

// If all available peers for a cid sent a DONT_HAVE, signal to the session
// that we've exhausted available peers
if len(wants) > 0 {
if len(wants) != 0 {
exhausted := sws.bpm.AllPeersDoNotHaveBlock(sws.spm.Peers(), wants)
sws.processExhaustedWants(exhausted)
}
Expand All @@ -506,7 +506,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU
// already been marked as exhausted are passed to onPeersExhausted()
func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) {
newlyExhausted := sws.newlyExhausted(exhausted)
if len(newlyExhausted) > 0 {
if len(newlyExhausted) != 0 {
sws.onPeersExhausted(newlyExhausted)
}
}
Expand Down

0 comments on commit ac4c801

Please sign in to comment.