Skip to content

Commit

Permalink
Merge pull request #129 from ipfs/fix/sync-query
Browse files Browse the repository at this point in the history
sync: apply entire query while locked
  • Loading branch information
Stebalien authored Apr 18, 2019
2 parents ef49136 + 241b4af commit b19d692
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
23 changes: 9 additions & 14 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,17 @@ func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Re

// ResultsWithEntries returns a Results object from a list of entries
func ResultsWithEntries(q Query, res []Entry) Results {
b := NewResultBuilder(q)

// go consume all the entries and add them to the results.
b.Process.Go(func(worker goprocess.Process) {
for _, e := range res {
select {
case b.Output <- Result{Entry: e}:
case <-worker.Closing(): // client told us to close early
return
i := 0
return ResultsFromIterator(q, Iterator{
Next: func() (Result, bool) {
if i >= len(res) {
return Result{}, false
}
}
return
next := res[i]
i++
return Result{Entry: next}, true
},
})

go b.Process.CloseAfterChildren()
return b.Results()
}

func ResultsReplaceQuery(r Results, q Query) Results {
Expand Down
21 changes: 19 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,28 @@ func (d *MutexDatastore) Delete(key ds.Key) (err error) {
return d.child.Delete(key)
}

// KeyList implements Datastore.KeyList
// Query implements Datastore.Query
func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
d.RLock()
defer d.RUnlock()
return d.child.Query(q)

// Apply the entire query while locked. Non-sync datastores may not
// allow concurrent queries.

results, err := d.child.Query(q)
if err != nil {
return nil, err
}

entries, err1 := results.Rest()
err2 := results.Close()
switch {
case err1 != nil:
return nil, err1
case err2 != nil:
return nil, err2
}
return dsq.ResultsWithEntries(q, entries), nil
}

func (d *MutexDatastore) Batch() (ds.Batch, error) {
Expand Down

0 comments on commit b19d692

Please sign in to comment.