diff --git a/posting/lists.go b/posting/lists.go index 5f7f799a2dc..47f52c92102 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -292,7 +292,7 @@ func GetOrCreate(key []byte, group uint32) (rlist *List, decr func()) { // Get takes a key and a groupID. It checks if the in-memory map has an // updated value and returns it if it exists or it gets from the store and DOES NOT ADD to lhmap. -func Get(key []byte, gid uint32) (rlist *List, decr func()) { +func Get(key []byte) (rlist *List, decr func()) { fp := farm.Fingerprint64(key) lp := lcache.Get(fp) diff --git a/worker/sort.go b/worker/sort.go index 3e41095a750..47a4488225d 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -21,6 +21,7 @@ import ( "bytes" "fmt" "strings" + "time" "github.com/dgraph-io/badger" "golang.org/x/net/context" @@ -136,7 +137,7 @@ func sortWithoutIndex(ctx context.Context, ts *protos.SortMessage) (*protos.Sort default: // Copy, otherwise it'd affect the destUids and hence the srcUids of Next level. tempList := &protos.List{ts.UidMatrix[i].Uids} - if err := sortByValue(ts.Attr, ts.Langs, tempList, sType, ts.Desc); err != nil { + if err := sortByValue(ctx, ts, tempList, sType); err != nil { return r, err } paginate(int(ts.Offset), int(ts.Count), tempList) @@ -228,7 +229,7 @@ BUCKETS: } // Intersect every UID list with the index bucket, and update their // results (in out). - err := intersectBucket(ts, ts.Attr, token, out) + err := intersectBucket(ctx, ts, token, out) switch err { case errDone: break BUCKETS @@ -277,6 +278,13 @@ func processSort(ctx context.Context, ts *protos.SortMessage) (*protos.SortResul cctx, cancel := context.WithCancel(ctx) resCh := make(chan result, 2) go func() { + select { + case <-time.After(3 * time.Millisecond): + // Wait between ctx chan and time chan. + case <-ctx.Done(): + resCh <- result{err: ctx.Err()} + return + } r, err := sortWithoutIndex(cctx, ts) resCh <- result{ res: r, @@ -313,8 +321,10 @@ type intersectedList struct { // intersectBucket intersects every UID list in the UID matrix with the // indexed bucket. -func intersectBucket(ts *protos.SortMessage, attr, token string, out []intersectedList) error { +func intersectBucket(ctx context.Context, ts *protos.SortMessage, token string, + out []intersectedList) error { count := int(ts.Count) + attr := ts.Attr sType, err := schema.State().TypeOf(attr) if err != nil || !sType.IsScalar() { return x.Errorf("Cannot sort attribute %s of type object.", attr) @@ -322,7 +332,8 @@ func intersectBucket(ts *protos.SortMessage, attr, token string, out []intersect scalar := sType key := x.IndexKey(attr, token) - pl, decr := posting.GetOrCreate(key, 1) + // Don't put the Index keys in memory. + pl, decr := posting.Get(key) defer decr() // For each UID list, we need to intersect with the index bucket. @@ -349,7 +360,7 @@ func intersectBucket(ts *protos.SortMessage, attr, token string, out []intersect // We are within the page. We need to apply sorting. // Sort results by value before applying offset. - if err := sortByValue(attr, ts.Langs, result, scalar, ts.Desc); err != nil { + if err := sortByValue(ctx, ts, result, scalar); err != nil { return err } @@ -397,29 +408,35 @@ func paginate(offset, count int, dest *protos.List) { } // sortByValue fetches values and sort UIDList. -func sortByValue(attr string, langs []string, ul *protos.List, typ types.TypeID, desc bool) error { +func sortByValue(ctx context.Context, ts *protos.SortMessage, ul *protos.List, + typ types.TypeID) error { lenList := len(ul.Uids) uids := make([]uint64, 0, lenList) values := make([]types.Val, 0, lenList) for i := 0; i < lenList; i++ { - uid := ul.Uids[i] - val, err := fetchValue(uid, attr, langs, typ) - if err != nil { - // If a value is missing, skip that UID in the result. - continue + select { + case <-ctx.Done(): + return ctx.Err() + default: + uid := ul.Uids[i] + val, err := fetchValue(uid, ts.Attr, ts.Langs, typ) + if err != nil { + // If a value is missing, skip that UID in the result. + continue + } + uids = append(uids, uid) + values = append(values, val) } - uids = append(uids, uid) - values = append(values, val) } - err := types.Sort(values, &protos.List{uids}, desc) + err := types.Sort(values, &protos.List{uids}, ts.Desc) ul.Uids = uids return err } // fetchValue gets the value for a given UID. func fetchValue(uid uint64, attr string, langs []string, scalar types.TypeID) (types.Val, error) { - // TODO: Maybe use posting.Get - pl, decr := posting.Get(x.DataKey(attr, uid), group.BelongsTo(attr)) + // Don't put the values in memory + pl, decr := posting.Get(x.DataKey(attr, uid)) defer decr() src, err := pl.ValueFor(langs)