Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize and refactor facet filtering #2829

Merged
merged 6 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,12 @@ func (l *List) hasPendingTxn() bool {
return false
}

func (l *List) ApproxLen() int {
l.RLock()
defer l.RUnlock()
return len(l.mutationMap) + codec.ApproxLen(l.plist.Pack)
}

// Uids returns the UIDs given some query params.
// We have to apply the filtering before applying (offset, count).
// WARNING: Calling this function just to get Uids is expensive
Expand Down
16 changes: 13 additions & 3 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
// Can happen in recurse query.
continue
}
if len(pc.facetsMatrix) > 0 && len(pc.facetsMatrix) != len(pc.uidMatrix) {
return x.Errorf("length of facetsMatrix and uidMatrix mismatch: %d vs %d",
len(pc.facetsMatrix), len(pc.uidMatrix))
}

idx := algo.IndexOf(pc.SrcUIDs, uid)
if idx < 0 {
Expand All @@ -411,8 +415,10 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
fieldName := pc.fieldName()
if len(pc.counts) > 0 {
addCount(pc, uint64(pc.counts[idx]), dst)

} else if pc.SrcFunc != nil && pc.SrcFunc.Name == "checkpwd" {
addCheckPwd(pc, pc.valueMatrix[idx].Values, dst)

} else if idx < len(pc.uidMatrix) && len(pc.uidMatrix[idx].Uids) > 0 {
var fcsList []*pb.Facets
if pc.Params.Facet != nil {
Expand Down Expand Up @@ -485,7 +491,7 @@ func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
continue
}

if pc.Params.Facet != nil && len(pc.facetsMatrix[idx].FacetsList) > 0 {
if len(pc.facetsMatrix) > idx && len(pc.facetsMatrix[idx].FacetsList) > 0 {
// in case of Value we have only one Facets
for _, f := range pc.facetsMatrix[idx].FacetsList[0].Facets {
fVal, err := facets.ValFor(f)
Expand Down Expand Up @@ -1279,7 +1285,7 @@ func (sg *SubGraph) populatePostAggregation(doneVars map[string]varValue, path [

// Filters might have updated the destuids. facetMatrix should also be updated.
func (sg *SubGraph) updateFacetMatrix() {
if sg.Params.Facet == nil {
if len(sg.facetsMatrix) != len(sg.uidMatrix) {
return
}

Expand Down Expand Up @@ -2205,9 +2211,13 @@ func (sg *SubGraph) updateDestUids() {
}

func (sg *SubGraph) sortAndPaginateUsingFacet(ctx context.Context) error {
if sg.facetsMatrix == nil {
if len(sg.facetsMatrix) == 0 {
return nil
}
if len(sg.facetsMatrix) != len(sg.uidMatrix) {
return x.Errorf("Facet matrix and UID matrix mismatch: %d vs %d",
len(sg.facetsMatrix), len(sg.uidMatrix))
}
orderby := sg.Params.FacetOrder
for i := 0; i < len(sg.uidMatrix); i++ {
ul := sg.uidMatrix[i]
Expand Down
2 changes: 1 addition & 1 deletion query/shortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (sg *SubGraph) getCost(matrix, list int) (cost float64,
fcs *pb.Facets, rerr error) {

cost = 1.0
if sg.Params.Facet == nil {
if len(sg.facetsMatrix) <= matrix {
return cost, fcs, rerr
}
fcsList := sg.facetsMatrix[matrix].FacetsList
Expand Down
83 changes: 37 additions & 46 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ import (
)

var (
emptyUIDList pb.List
emptyResult pb.Result
emptyValueList = pb.ValueList{Values: []*pb.TaskValue{}}
emptyUIDList pb.List
emptyFacetsList pb.FacetsList
emptyResult pb.Result
emptyValueList = pb.ValueList{Values: []*pb.TaskValue{}}
)

func invokeNetworkRequest(
Expand Down Expand Up @@ -276,11 +277,6 @@ func needsIndex(fnType FuncType) bool {
}
}

type result struct {
uid uint64
facets []*api.Facet
}

type funcArgs struct {
q *pb.Query
gid uint32
Expand Down Expand Up @@ -382,11 +378,11 @@ func handleValuePostings(ctx context.Context, args funcArgs) error {

if err == posting.ErrNoValue || len(vals) == 0 {
out.UidMatrix = append(out.UidMatrix, &emptyUIDList)
out.FacetMatrix = append(out.FacetMatrix, &emptyFacetsList)
if q.DoCount {
out.Counts = append(out.Counts, 0)
} else {
out.ValueMatrix = append(out.ValueMatrix, &emptyValueList)
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
Expand Down Expand Up @@ -445,6 +441,8 @@ func handleValuePostings(ctx context.Context, args funcArgs) error {
}
out.FacetMatrix = append(out.FacetMatrix,
&pb.FacetsList{FacetsList: []*pb.Facets{{Facets: fs}}})
} else {
out.FacetMatrix = append(out.FacetMatrix, &emptyFacetsList)
}

switch {
Expand Down Expand Up @@ -575,39 +573,6 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti
return err
}

// get filtered uids and facets.
var filteredRes []*result

var perr error
filteredRes = make([]*result, 0)
err = pl.Postings(opts, func(p *pb.Posting) error {
res := true
res, perr = applyFacetsTree(p.Facets, facetsTree)
if perr != nil {
return posting.ErrStopIteration
}
if res {
filteredRes = append(filteredRes, &result{
uid: p.Uid,
facets: facets.CopyFacets(p.Facets, q.FacetParam)})
}
return nil // continue iteration.
})
if err != nil {
return err
} else if perr != nil {
return perr
}

// add facets to result.
if q.FacetParam != nil {
var fcsList []*pb.Facets
for _, fres := range filteredRes {
fcsList = append(fcsList, &pb.Facets{Facets: fres.facets})
}
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList})
}

switch {
case q.DoCount:
if i == 0 {
Expand Down Expand Up @@ -667,12 +632,38 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti
if i == 0 {
span.Annotate(nil, "default")
}
// The more usual case: Getting the UIDs.
uidList := new(pb.List)
for _, fres := range filteredRes {
uidList.Uids = append(uidList.Uids, fres.uid)

uidList := &pb.List{
Uids: make([]uint64, 0, pl.ApproxLen()),
}

var fcsList []*pb.Facets
err = pl.Postings(opts, func(p *pb.Posting) error {
pick, err := applyFacetsTree(p.Facets, facetsTree)
if err != nil {
return err
}
if pick {
// TODO: This way of picking Uids differs from how
// pl.Uids works. So, have a look to see if we're
// catching all the edge cases here.
uidList.Uids = append(uidList.Uids, p.Uid)
if q.FacetParam != nil {
fcsList = append(fcsList, &pb.Facets{
Facets: facets.CopyFacets(p.Facets, q.FacetParam),
})
}
}
return nil // continue iteration.
})
if err != nil {
return err
}

out.UidMatrix = append(out.UidMatrix, uidList)
if q.FacetParam != nil {
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList})
}
}
}
return nil
Expand Down
24 changes: 12 additions & 12 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestProcessTaskIndexMLayer(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{10, 12},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand All @@ -180,9 +180,9 @@ func TestProcessTaskIndexMLayer(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{12},
nil,
{},
{10},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand Down Expand Up @@ -211,8 +211,8 @@ func TestProcessTaskIndexMLayer(t *testing.T) {

require.EqualValues(t, [][]uint64{
{12},
nil,
nil,
{},
{},
}, algo.ToUintsListForTest(r.UidMatrix))

query = newQuery("friend", nil, []string{"anyofterms", "", "photon notphoton ignored"})
Expand All @@ -221,8 +221,8 @@ func TestProcessTaskIndexMLayer(t *testing.T) {

require.EqualValues(t, [][]uint64{
{12},
nil,
nil,
{},
{},
}, algo.ToUintsListForTest(r.UidMatrix))
}

Expand All @@ -236,7 +236,7 @@ func TestProcessTaskIndex(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{10, 12},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand All @@ -258,9 +258,9 @@ func TestProcessTaskIndex(t *testing.T) {
require.NoError(t, err)

require.EqualValues(t, [][]uint64{
nil,
{},
{12},
nil,
{},
{10},
}, algo.ToUintsListForTest(r.UidMatrix))

Expand Down Expand Up @@ -289,8 +289,8 @@ func TestProcessTaskIndex(t *testing.T) {

require.EqualValues(t, [][]uint64{
{12},
nil,
nil,
{},
{},
}, algo.ToUintsListForTest(r.UidMatrix))
}

Expand Down