diff --git a/lookup.go b/lookup.go index 03801325c..1af9cc8b9 100644 --- a/lookup.go +++ b/lookup.go @@ -14,11 +14,11 @@ import ( "go.opentelemetry.io/otel/trace" ) -// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of +// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a slice of // the K closest peers to the given key. // -// If the context is canceled, this function will return the context error along -// with the closest K peers it has found so far. +// If the context is canceled, this function will return the context error +// along with the closest K peers it has found so far. func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() @@ -27,9 +27,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, return nil, fmt.Errorf("can't lookup empty key") } - //TODO: I can break the interface! return []peer.ID + // TODO: I can break the interface! return []peer.ID lookupRes, err := dht.runLookupWithFollowup(ctx, key, dht.pmGetClosestPeers(key), func(*qpeerset.QueryPeerset) bool { return false }) - if err != nil { return nil, err } @@ -47,7 +46,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, metrics.NetworkSize.M(int64(ns)) } - // refresh the cpl for this key as the query was successful + // Reset the refresh timer for this key's bucket since we've just + // successfully interacted with the closest peers to key dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now()) return lookupRes.peers, nil diff --git a/query.go b/query.go index 7c01a2af2..9d8740b32 100644 --- a/query.go +++ b/query.go @@ -24,8 +24,10 @@ import ( // ErrNoPeersQueried is returned when we failed to connect to any peers. var ErrNoPeersQueried = errors.New("failed to query any peers") -type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error) -type stopFn func(*qpeerset.QueryPeerset) bool +type ( + queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error) + stopFn func(*qpeerset.QueryPeerset) bool +) // query represents a single DHT query. type query struct { @@ -187,7 +189,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn q.recordValuablePeers() } - res := q.constructLookupResult(targetKadID) + res := q.constructLookupResult() return res, q.queryPeers, nil } @@ -218,45 +220,29 @@ func (q *query) recordValuablePeers() { } // constructLookupResult takes the query information and uses it to construct the lookup result -func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult { - // determine if the query terminated early - completed := true - - // Lookup and starvation are both valid ways for a lookup to complete. (Starvation does not imply failure.) - // Lookup termination (as defined in isLookupTermination) is not possible in small networks. - // Starvation is a successful query termination in small networks. - if !(q.isLookupTermination() || q.isStarvationTermination()) { - completed = false - } +func (q *query) constructLookupResult() *lookupWithFollowupResult { + // Lookup and starvation are both valid ways for a lookup to complete. + // (Starvation does not imply failure.) Lookup termination (as defined in + // isLookupTermination) is not possible in small networks. Starvation is a + // successful query termination in small networks. + completed := q.isLookupTermination() || q.isStarvationTermination() // extract the top K not unreachable peers - var peers []peer.ID - peerState := make(map[peer.ID]qpeerset.PeerState) - qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) - for _, p := range qp { - state := q.queryPeers.GetState(p) - peerState[p] = state - peers = append(peers, p) - } - - // get the top K overall peers - sortedPeers := kb.SortClosestPeers(peers, target) - if len(sortedPeers) > q.dht.bucketSize { - sortedPeers = sortedPeers[:q.dht.bucketSize] - } + peers := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried) + // get the top K overall peers (including unreachable) closest := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried, qpeerset.PeerUnreachable) // return the top K not unreachable peers as well as their states at the end of the query res := &lookupWithFollowupResult{ - peers: sortedPeers, - state: make([]qpeerset.PeerState, len(sortedPeers)), + peers: peers, + state: make([]qpeerset.PeerState, len(peers)), completed: completed, closest: closest, } - for i, p := range sortedPeers { - res.state[i] = peerState[p] + for i, p := range peers { + res.state[i] = q.queryPeers.GetState(p) } return res @@ -301,7 +287,7 @@ func (q *query) run() { // termination is triggered on end-of-lookup conditions or starvation of unused peers // it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers. - ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn) + ready, reason, qPeers := q.isReadyToTerminate(maxNumQueriesToSpawn) if ready { q.terminate(pathCtx, cancelPath, reason) } @@ -347,7 +333,7 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID go q.queryPeer(ctx, ch, queryPeer) } -func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) { +func (q *query) isReadyToTerminate(nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) { // give the application logic a chance to terminate if q.stopFn(q.queryPeers) { return true, LookupStopped, nil @@ -360,16 +346,7 @@ func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool } // The peers we query next should be ones that we have only Heard about. - var peersToQuery []peer.ID - peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard) - count := 0 - for _, p := range peers { - peersToQuery = append(peersToQuery, p) - count++ - if count == nPeersToQuery { - break - } - } + peersToQuery := q.queryPeers.GetClosestNInStates(nPeersToQuery, qpeerset.PeerHeard) return false, -1, peersToQuery } @@ -420,7 +397,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID ctx, span := internal.StartSpan(ctx, "IpfsDHT.QueryPeer") defer span.End() - dialCtx, queryCtx := ctx, ctx + dialCtx, queryCtx := ctx, q.ctx // dial the peer if err := q.dht.dialPeer(dialCtx, p); err != nil { diff --git a/routing.go b/routing.go index 2753d7087..af1fed66d 100644 --- a/routing.go +++ b/routing.go @@ -496,14 +496,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i ctx, end := tracer.FindProvidersAsync(dhtName, ctx, key, count) defer func() { ch = end(ch, nil) }() + peerOut := make(chan peer.AddrInfo) if !dht.enableProviders || !key.Defined() { - peerOut := make(chan peer.AddrInfo) close(peerOut) return peerOut } - peerOut := make(chan peer.AddrInfo) - keyMH := key.Hash() logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))