-
Notifications
You must be signed in to change notification settings - Fork 256
go-libp2p-kad-dht: implement GetValuesAsync #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -143,33 +143,85 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) ([]byte, error) { | |
| } | ||
|
|
||
| func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]routing.RecvdVal, error) { | ||
| var resChan chan *routing.RecvdVal = make(chan *routing.RecvdVal, 0) | ||
| go dht.getValuesAsyncRoutine(ctx, key, nvals, resChan, nil) | ||
|
|
||
| var vals []routing.RecvdVal | ||
| loop: | ||
| for { | ||
| select { | ||
| case res := <-resChan: | ||
| if res == nil { | ||
| break loop | ||
| } else if res.Error != nil { | ||
| return nil, res.Error | ||
| } else { | ||
| vals = append(vals, *res) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return vals, nil | ||
| } | ||
|
|
||
| func (dht *IpfsDHT) GetValuesAsync(ctx context.Context, key string, nvals int) <-chan *routing.RecvdVal { | ||
| var resChan chan *routing.RecvdVal = make(chan *routing.RecvdVal, 0) | ||
| var errChan chan error = make(chan error, 0) | ||
| go dht.getValuesAsyncRoutine(ctx, key, nvals, resChan, errChan) | ||
| go func() { | ||
| for err := range errChan { | ||
| log.Debugf("Query error: %s", err) | ||
| notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ | ||
| Type: notif.QueryError, | ||
| Extra: err.Error(), | ||
| }) | ||
| } | ||
| }() | ||
| return resChan | ||
| } | ||
|
|
||
| func (dht *IpfsDHT) getValuesAsyncRoutine(ctx context.Context, key string, nvals int, resChan chan<- *routing.RecvdVal, errChan chan<- error) { | ||
| var valslock sync.Mutex | ||
| var sentRes int | ||
|
|
||
| defer close(resChan) | ||
| if errChan != nil { | ||
| defer close(errChan) | ||
| } | ||
|
|
||
| // If we have it local, dont bother doing an RPC! | ||
| lrec, err := dht.getLocal(key) | ||
| if err == nil { | ||
| // TODO: this is tricky, we dont always want to trust our own value | ||
| // what if the authoritative source updated it? | ||
| log.Debug("have it locally") | ||
| vals = append(vals, routing.RecvdVal{ | ||
| sentRes = sentRes + 1 | ||
| resChan <- &routing.RecvdVal{ | ||
| Val: lrec.GetValue(), | ||
| From: dht.self, | ||
| }) | ||
| } | ||
|
|
||
| if nvals <= 1 { | ||
| return vals, nil | ||
| if nvals == 0 || nvals == 1 { | ||
| return | ||
| } | ||
| } else if nvals == 0 { | ||
| return nil, err | ||
| if errChan != nil { | ||
| errChan <- err | ||
| } | ||
| resChan <- &routing.RecvdVal{Error: err} | ||
| return | ||
| } | ||
|
|
||
| // get closest peers in the routing table | ||
| rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue) | ||
| log.Debugf("peers in rt: %d %s", len(rtp), rtp) | ||
| if len(rtp) == 0 { | ||
| log.Warning("No peers from routing table!") | ||
| return nil, kb.ErrLookupFailure | ||
| if errChan != nil { | ||
| errChan <- err | ||
| } | ||
| resChan <- &routing.RecvdVal{Error: kb.ErrLookupFailure} | ||
| return | ||
| } | ||
|
|
||
| // setup the Query | ||
|
|
@@ -205,11 +257,12 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou | |
| Val: rec.GetValue(), | ||
| From: p, | ||
| } | ||
| resChan <- &rv | ||
| valslock.Lock() | ||
| vals = append(vals, rv) | ||
| sentRes += 1 | ||
|
|
||
| // If weve collected enough records, we're done | ||
| if len(vals) >= nvals { | ||
| if sentRes >= nvals || nvals >= 0 { | ||
| res.success = true | ||
| } | ||
| valslock.Unlock() | ||
|
|
@@ -226,14 +279,15 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou | |
|
|
||
| // run it! | ||
| _, err = query.Run(ctx, rtp) | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. whyrusleeping:
mildred:
mildred/ipfs-objects@39c20b2#diff-50d2a8e6d11e785b103932ac6c926e72R274
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case where
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A few days ago, I understood more from your remark. I just fixed an issue where However, when there is no result, we might get an error from the lower layers, but that doesn't necessarily mean the GetValues must return an error. Instead it might mean that there is no result at all and we must not trigger an error. The previous version of the code silents some errors in this case, I see no reason to change that in this PR.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mildred right, i was mostly commenting that the existing behaviour might be incorrect. We can always change it in a different PR though. |
||
| if len(vals) == 0 { | ||
| if sentRes == 0 { | ||
| if err != nil { | ||
| return nil, err | ||
| if errChan != nil { | ||
| errChan <- err | ||
| } | ||
| resChan <- &routing.RecvdVal{Error: err} | ||
| } | ||
| } | ||
|
|
||
| return vals, nil | ||
|
|
||
| } | ||
|
|
||
| // Value provider layer of indirection. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whyrusleeping:
mildred:
mildred/ipfs-objects@39c20b2#diff-50d2a8e6d11e785b103932ac6c926e72R169
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You added the error field into the RecvdVal in the routing interface PR. Should we make use of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, I see getValuesAsyncRoutine will use it if error channel is not specified. Should we use it instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will use both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mildred is the errChan ever used? It seems a bit redundant at this point