Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"go.opentelemetry.io/otel/trace"
)

// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of
// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a slice of
// the K closest peers to the given key.
//
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
// If the context is canceled, this function will return the context error
// along with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()
Expand All @@ -27,9 +27,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
return nil, fmt.Errorf("can't lookup empty key")
}

//TODO: I can break the interface! return []peer.ID
// TODO: I can break the interface! return []peer.ID
lookupRes, err := dht.runLookupWithFollowup(ctx, key, dht.pmGetClosestPeers(key), func(*qpeerset.QueryPeerset) bool { return false })

if err != nil {
return nil, err
}
Expand All @@ -47,7 +46,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID,
metrics.NetworkSize.M(int64(ns))
}

// refresh the cpl for this key as the query was successful
// Reset the refresh timer for this key's bucket since we've just
// successfully interacted with the closest peers to key
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())

return lookupRes.peers, nil
Expand Down
65 changes: 21 additions & 44 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")

type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type stopFn func(*qpeerset.QueryPeerset) bool
type (
queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
stopFn func(*qpeerset.QueryPeerset) bool
)

// query represents a single DHT query.
type query struct {
Expand Down Expand Up @@ -187,7 +189,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
q.recordValuablePeers()
}

res := q.constructLookupResult(targetKadID)
res := q.constructLookupResult()
return res, q.queryPeers, nil
}

Expand Down Expand Up @@ -218,45 +220,29 @@ func (q *query) recordValuablePeers() {
}

// constructLookupResult takes the query information and uses it to construct the lookup result
func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {
// determine if the query terminated early
completed := true

// Lookup and starvation are both valid ways for a lookup to complete. (Starvation does not imply failure.)
// Lookup termination (as defined in isLookupTermination) is not possible in small networks.
// Starvation is a successful query termination in small networks.
if !(q.isLookupTermination() || q.isStarvationTermination()) {
completed = false
}
func (q *query) constructLookupResult() *lookupWithFollowupResult {
// Lookup and starvation are both valid ways for a lookup to complete.
// (Starvation does not imply failure.) Lookup termination (as defined in
// isLookupTermination) is not possible in small networks. Starvation is a
// successful query termination in small networks.
completed := q.isLookupTermination() || q.isStarvationTermination()

// extract the top K not unreachable peers
var peers []peer.ID
peerState := make(map[peer.ID]qpeerset.PeerState)
qp := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)
for _, p := range qp {
state := q.queryPeers.GetState(p)
peerState[p] = state
peers = append(peers, p)
}

// get the top K overall peers
sortedPeers := kb.SortClosestPeers(peers, target)
if len(sortedPeers) > q.dht.bucketSize {
sortedPeers = sortedPeers[:q.dht.bucketSize]
}
peers := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried)

// get the top K overall peers (including unreachable)
closest := q.queryPeers.GetClosestNInStates(q.dht.bucketSize, qpeerset.PeerHeard, qpeerset.PeerWaiting, qpeerset.PeerQueried, qpeerset.PeerUnreachable)

// return the top K not unreachable peers as well as their states at the end of the query
res := &lookupWithFollowupResult{
peers: sortedPeers,
state: make([]qpeerset.PeerState, len(sortedPeers)),
peers: peers,
state: make([]qpeerset.PeerState, len(peers)),
completed: completed,
closest: closest,
}

for i, p := range sortedPeers {
res.state[i] = peerState[p]
for i, p := range peers {
res.state[i] = q.queryPeers.GetState(p)
}

return res
Expand Down Expand Up @@ -301,7 +287,7 @@ func (q *query) run() {

// termination is triggered on end-of-lookup conditions or starvation of unused peers
// it also returns the peers we should query next for a maximum of `maxNumQueriesToSpawn` peers.
ready, reason, qPeers := q.isReadyToTerminate(pathCtx, maxNumQueriesToSpawn)
ready, reason, qPeers := q.isReadyToTerminate(maxNumQueriesToSpawn)
if ready {
q.terminate(pathCtx, cancelPath, reason)
}
Expand Down Expand Up @@ -347,7 +333,7 @@ func (q *query) spawnQuery(ctx context.Context, cause peer.ID, queryPeer peer.ID
go q.queryPeer(ctx, ch, queryPeer)
}

func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) {
func (q *query) isReadyToTerminate(nPeersToQuery int) (bool, LookupTerminationReason, []peer.ID) {
// give the application logic a chance to terminate
if q.stopFn(q.queryPeers) {
return true, LookupStopped, nil
Expand All @@ -360,16 +346,7 @@ func (q *query) isReadyToTerminate(ctx context.Context, nPeersToQuery int) (bool
}

// The peers we query next should be ones that we have only Heard about.
var peersToQuery []peer.ID
peers := q.queryPeers.GetClosestInStates(qpeerset.PeerHeard)
count := 0
for _, p := range peers {
peersToQuery = append(peersToQuery, p)
count++
if count == nPeersToQuery {
break
}
}
peersToQuery := q.queryPeers.GetClosestNInStates(nPeersToQuery, qpeerset.PeerHeard)

return false, -1, peersToQuery
}
Expand Down Expand Up @@ -420,7 +397,7 @@ func (q *query) queryPeer(ctx context.Context, ch chan<- *queryUpdate, p peer.ID
ctx, span := internal.StartSpan(ctx, "IpfsDHT.QueryPeer")
defer span.End()

dialCtx, queryCtx := ctx, ctx
dialCtx, queryCtx := ctx, q.ctx

// dial the peer
if err := q.dht.dialPeer(dialCtx, p); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
ctx, end := tracer.FindProvidersAsync(dhtName, ctx, key, count)
defer func() { ch = end(ch, nil) }()

peerOut := make(chan peer.AddrInfo)
if !dht.enableProviders || !key.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
return peerOut
}

peerOut := make(chan peer.AddrInfo)

keyMH := key.Hash()

logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
Expand Down