Skip to content

Commit

Permalink
Merge pull request onflow#5960 from onflow/leo/fix-request-collection
Browse files Browse the repository at this point in the history
[Backport] Fix request collection
  • Loading branch information
zhangchiqing authored May 21, 2024
2 parents 55fd3bb + d520c76 commit fa3ae7b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 19 deletions.
26 changes: 8 additions & 18 deletions engine/common/requester/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,24 +312,11 @@ func (e *Engine) dispatchRequest() (bool, error) {
return false, fmt.Errorf("could not get providers: %w", err)
}

// randomize order of items, so that they can be requested in different order each time
rndItems := make([]flow.Identifier, 0, len(e.items))
for k := range e.items {
rndItems = append(rndItems, e.items[k].EntityID)
}
err = rand.Shuffle(uint(len(rndItems)), func(i, j uint) {
rndItems[i], rndItems[j] = rndItems[j], rndItems[i]
})
if err != nil {
return false, fmt.Errorf("shuffle failed: %w", err)
}

// go through each item and decide if it should be requested again
now := time.Now().UTC()
var providerID flow.Identifier
var entityIDs []flow.Identifier
for _, entityID := range rndItems {
item := e.items[entityID]
for entityID, item := range e.items {

// if the item should not be requested yet, ignore
cutoff := item.LastRequested.Add(item.RetryAfter)
Expand Down Expand Up @@ -363,15 +350,18 @@ func (e *Engine) dispatchRequest() (bool, error) {
// order is random and will skip the item most of the times
// when other items are available
if providerID == flow.ZeroID {
providers = providers.Filter(item.ExtraSelector)
if len(providers) == 0 {
return false, fmt.Errorf("no valid providers available")
filteredProviders := providers.Filter(item.ExtraSelector)
if len(filteredProviders) == 0 {
return false, fmt.Errorf("no valid providers available for item %s, total providers: %v", entityID.String(), len(providers))
}
id, err := providers.Sample(1)
// ramdonly select a provider from the filtered set
// to send as many item requests as possible.
id, err := filteredProviders.Sample(1)
if err != nil {
return false, fmt.Errorf("sampling failed: %w", err)
}
providerID = id[0].NodeID
providers = filteredProviders
}

// add item to list and set retry parameters
Expand Down
6 changes: 5 additions & 1 deletion engine/execution/ingestion/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/onflow/flow-go/state/protocol"
)

var onlyOnflowRegex = regexp.MustCompile(`.*\.onflow\.org:3569$`)
var onlyOnflowRegex = regexp.MustCompile(`.*\.(onflow\.org|dapper-flow\.com):3569$`)

type CollectionFetcher struct {
log zerolog.Logger
Expand Down Expand Up @@ -71,6 +71,10 @@ func (e *CollectionFetcher) FetchCollection(blockID flow.Identifier, height uint
})
}

e.log.Debug().Bool("onflowOnlyLNs", e.onflowOnlyLNs).
Msgf("queued collection %v for block %v, height %v from guarantors: %v",
guarantee.ID(), blockID, height, guarantors)

// queue the collection to be requested from one of the guarantors
e.request.EntityByID(guarantee.ID(), filter.And(
filters...,
Expand Down

0 comments on commit fa3ae7b

Please sign in to comment.