Conversation
|
Tell me when you want this reviewed. |
bcb8474 to
75bae8d
Compare
|
Should be ready |
| return nil, routing.ErrNotFound | ||
| responsesNeeded := 0 | ||
| if !cfg.Offline { | ||
| responsesNeeded = getQuorum(&cfg) |
There was a problem hiding this comment.
Not sure if we should be applying the quorum in here. That's more of a GetValue thing. Really, we should probably have a separate "limit" option that defaults to -1. We can then get the quorum in GetValue and then apply it as the limit. Otherwise, we'll always stop after the default quorum (if not specified).
There was a problem hiding this comment.
Note: by default, we should continue searching until we run out of peers.
There was a problem hiding this comment.
Alternatively, we may just want to make GetValue and SearchValue orthogonal (using the same GetValues under the covers).
| select { | ||
| case r, ok := <-responses: | ||
| if !ok { | ||
| responses = nil |
There was a problem hiding this comment.
It would lead to occasionally ignoring errors
There was a problem hiding this comment.
Ah. I didn't see the && below.
Actually, do we even need an error channel? I'd just return an error up front (valid key, non-empty routing table, etc.) and then close when done. At that point, I don't see how we can really "fail". If the user wants more context, they can register a notifier.
Is that possible? It'll be a lot nicer to the user.
|
|
||
| // GetValues gets nvals values corresponding to the given key. | ||
| func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { | ||
| func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) <-chan RecvdVal { |
There was a problem hiding this comment.
We may not want to change the API here. For now, I'd just add a private getValues function and make GetValue a simple wrapper.
4ca5152 to
09f5251
Compare
|
On quorum on |
Yeah, you're probably right. However, I'd default to "continue till we run out of peers" instead of the default quorum of 16. |
790b5ca to
f02c32e
Compare
Done |
| } | ||
|
|
||
| if err != io.EOF { | ||
| if err != routing.ErrNotFound { |
There was a problem hiding this comment.
this should be a context.ErrDeadlineExceeded. We'll probably need to check if the context was canceled in GetValue when the channel is closed.
| if responsesNeeded < 0 { | ||
| responsesNeeded = 0 | ||
| } | ||
| vals := make([]RecvdVal, 0, responsesNeeded) |
There was a problem hiding this comment.
This could grow very large (with no quorum). We may want to fire off a round of corrections ocationally (i.e., see 30 values, correct peers and reset). However, we can probably leave that as a TODO for now.
There was a problem hiding this comment.
Alternatively, we could just cap responsesNeeded for now. Actually, that's probably the best thing to do. Cap it at 50 or something.
There was a problem hiding this comment.
Leaving it unbounded could also cause a bunch of goroutines below.
| best = r | ||
| } | ||
| if len(recs) == 0 { | ||
|
|
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| if err != nil { | ||
| continue //TODO: Do we want to do something with the error here? | ||
| } | ||
| if i != best && !bytes.Equal(v.Val, vals[best].Val) { |
There was a problem hiding this comment.
I will be either 0 or 1, not best.
| if best > -1 { | ||
| i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val}) | ||
| if err != nil { | ||
| continue //TODO: Do we want to do something with the error here? |
There was a problem hiding this comment.
Log an error. This should never happen. Honestly, we could just panic (but I'd rather not).
| out = append(out, val) | ||
| } | ||
|
|
||
| return out, nil |
There was a problem hiding this comment.
Needs to return this plus ctx.Error() (the context could have been canceled, etc.).
aeecd87 to
c22e1e1
Compare
| record "github.com/libp2p/go-libp2p-record" | ||
| routing "github.com/libp2p/go-libp2p-routing" | ||
| mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" | ||
| "github.com/libp2p/go-libp2p-record" |
There was a problem hiding this comment.
Given that we never really use consistent naming, in package names, I'd rather leave the explicit names in-place.
| } | ||
|
|
||
| if err != io.EOF { | ||
| if err != context.DeadlineExceeded { |
| if len(vals) < maxVals { | ||
| vals = append(vals, v) | ||
| } else { | ||
| i = (best + 1) % maxVals |
There was a problem hiding this comment.
i could still be out of bounds. It may be better to simply put the "best" value in a separate variable and then keep an array of peers to correct.
| } | ||
|
|
||
| func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) { | ||
| eip := log.EventBegin(ctx, "GetValues") |
There was a problem hiding this comment.
This should probably go into GetValues. That may also help simplify/get rid of the done dance.
| } | ||
| valslock.Lock() | ||
| vals = append(vals, rv) | ||
| vals <- rv |
There was a problem hiding this comment.
This should select on the context.
Stebalien
left a comment
There was a problem hiding this comment.
Missed two blocking channel writes.
| } | ||
| if sel == 1 && !bytes.Equal(v.Val, best.Val) { | ||
| best = &v | ||
| out <- v.Val |
There was a problem hiding this comment.
Needs to select on the context.
| if err := dht.Validator.Validate(key, v.Val); err == nil { | ||
| best = &v | ||
| out <- v.Val | ||
| } |
3091c6b to
246579e
Compare
Stebalien
left a comment
There was a problem hiding this comment.
LGTM. I have a few nits and cleanups but this should work. We can do that in a separate PR.
This exposes new function which streams entries as they are found - https://github.com/libp2p/go-libp2p-routing/blob/master/routing.go#L61
Part of ipfs/kubo#5232
Replaces #49
TODO: