Skip to content

Commit

Permalink
Changes in worker sort
Browse files Browse the repository at this point in the history
* Check context often in sortWithoutIndex to return quickly
* Start sortWithoutIndex after some time gap
  • Loading branch information
ashwin95r committed Jul 14, 2017
1 parent 57815c7 commit a25947b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
2 changes: 1 addition & 1 deletion posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) {

// Get takes a key and a groupID. It checks if the in-memory map has an
// updated value and returns it if it exists or it gets from the store and DOES NOT ADD to lhmap.
func Get(key []byte, gid uint32) (rlist *List, decr func()) {
func Get(key []byte) (rlist *List, decr func()) {
fp := farm.Fingerprint64(key)
lp := lcache.Get(fp)

Expand Down
49 changes: 33 additions & 16 deletions worker/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"fmt"
"strings"
"time"

"github.com/dgraph-io/badger"
"golang.org/x/net/context"
Expand Down Expand Up @@ -136,7 +137,7 @@ func sortWithoutIndex(ctx context.Context, ts *protos.SortMessage) (*protos.Sort
default:
// Copy, otherwise it'd affect the destUids and hence the srcUids of Next level.
tempList := &protos.List{ts.UidMatrix[i].Uids}
if err := sortByValue(ts.Attr, ts.Langs, tempList, sType, ts.Desc); err != nil {
if err := sortByValue(ctx, ts, tempList, sType); err != nil {
return r, err
}
paginate(int(ts.Offset), int(ts.Count), tempList)
Expand Down Expand Up @@ -228,7 +229,7 @@ BUCKETS:
}
// Intersect every UID list with the index bucket, and update their
// results (in out).
err := intersectBucket(ts, ts.Attr, token, out)
err := intersectBucket(ctx, ts, token, out)
switch err {
case errDone:
break BUCKETS
Expand Down Expand Up @@ -277,6 +278,13 @@ func processSort(ctx context.Context, ts *protos.SortMessage) (*protos.SortResul
cctx, cancel := context.WithCancel(ctx)
resCh := make(chan result, 2)
go func() {
select {
case <-time.After(3 * time.Millisecond):
// Wait between ctx chan and time chan.
case <-ctx.Done():
resCh <- result{err: ctx.Err()}
return
}
r, err := sortWithoutIndex(cctx, ts)
resCh <- result{
res: r,
Expand Down Expand Up @@ -313,16 +321,19 @@ type intersectedList struct {

// intersectBucket intersects every UID list in the UID matrix with the
// indexed bucket.
func intersectBucket(ts *protos.SortMessage, attr, token string, out []intersectedList) error {
func intersectBucket(ctx context.Context, ts *protos.SortMessage, token string,
out []intersectedList) error {
count := int(ts.Count)
attr := ts.Attr
sType, err := schema.State().TypeOf(attr)
if err != nil || !sType.IsScalar() {
return x.Errorf("Cannot sort attribute %s of type object.", attr)
}
scalar := sType

key := x.IndexKey(attr, token)
pl, decr := posting.GetOrCreate(key, 1)
// Don't put the Index keys in memory.
pl, decr := posting.Get(key)
defer decr()

// For each UID list, we need to intersect with the index bucket.
Expand All @@ -349,7 +360,7 @@ func intersectBucket(ts *protos.SortMessage, attr, token string, out []intersect

// We are within the page. We need to apply sorting.
// Sort results by value before applying offset.
if err := sortByValue(attr, ts.Langs, result, scalar, ts.Desc); err != nil {
if err := sortByValue(ctx, ts, result, scalar); err != nil {
return err
}

Expand Down Expand Up @@ -397,29 +408,35 @@ func paginate(offset, count int, dest *protos.List) {
}

// sortByValue fetches values and sort UIDList.
func sortByValue(attr string, langs []string, ul *protos.List, typ types.TypeID, desc bool) error {
func sortByValue(ctx context.Context, ts *protos.SortMessage, ul *protos.List,
typ types.TypeID) error {
lenList := len(ul.Uids)
uids := make([]uint64, 0, lenList)
values := make([]types.Val, 0, lenList)
for i := 0; i < lenList; i++ {
uid := ul.Uids[i]
val, err := fetchValue(uid, attr, langs, typ)
if err != nil {
// If a value is missing, skip that UID in the result.
continue
select {
case <-ctx.Done():
return ctx.Err()
default:
uid := ul.Uids[i]
val, err := fetchValue(uid, ts.Attr, ts.Langs, typ)
if err != nil {
// If a value is missing, skip that UID in the result.
continue
}
uids = append(uids, uid)
values = append(values, val)
}
uids = append(uids, uid)
values = append(values, val)
}
err := types.Sort(values, &protos.List{uids}, desc)
err := types.Sort(values, &protos.List{uids}, ts.Desc)
ul.Uids = uids
return err
}

// fetchValue gets the value for a given UID.
func fetchValue(uid uint64, attr string, langs []string, scalar types.TypeID) (types.Val, error) {
// TODO: Maybe use posting.Get
pl, decr := posting.Get(x.DataKey(attr, uid), group.BelongsTo(attr))
// Don't put the values in memory
pl, decr := posting.Get(x.DataKey(attr, uid))
defer decr()

src, err := pl.ValueFor(langs)
Expand Down

0 comments on commit a25947b

Please sign in to comment.