Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent alphas from asking zero to serve tablets during queries. #3091

Merged
merged 10 commits into from
Mar 15, 2019
21 changes: 21 additions & 0 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,27 @@ func (s *Server) ShouldServe(
return tab, nil
}

// Serves returns the tablet currently serving the predicate or nil if the predicate
// is not being served by any alpha.
func (s *Server) Serves(
ctx context.Context, tablet *pb.Tablet) (*pb.Tablet, error) {
ctx, span := otrace.StartSpan(ctx, "Zero.Serves")
defer span.End()

if len(tablet.Predicate) == 0 {
return nil, x.Errorf("Tablet predicate is empty in %+v", tablet)
}

// Check who is serving this tablet. Return a dummy tablet with GroupId 0
// if no alpha is serving this predicate.
tab := s.ServingTablet(tablet.Predicate)
span.Annotatef(nil, "Tablet for %s: %+v", tablet.Predicate, tab)
if tab != nil {
return tab, nil
}
return &pb.Tablet{GroupId: 0}, nil
}

func (s *Server) UpdateMembership(ctx context.Context, group *pb.Group) (*api.Payload, error) {
proposals, err := s.createProposals(group)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ service Zero {

rpc Oracle (api.Payload) returns (stream OracleDelta) {}
rpc ShouldServe (Tablet) returns (Tablet) {}
rpc Serves (Tablet) returns (Tablet) {}
rpc AssignUids (Num) returns (AssignedIds) {}
rpc Timestamps (Num) returns (AssignedIds) {}
rpc CommitOrAbort (api.TxnContext) returns (api.TxnContext) {}
Expand Down
442 changes: 238 additions & 204 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Function struct {
type SubGraph struct {
ReadTs uint64
Attr string
UnknownAttr bool
Params params
counts []uint32
valueMatrix []*pb.ValueList
Expand Down Expand Up @@ -2025,7 +2026,9 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)
if err != nil {
if err != nil && strings.Contains(err.Error(), worker.ErrUnservedTabletMessage) {
sg.UnknownAttr = true
} else if err != nil {
rch <- err
return
}
Expand Down
2 changes: 1 addition & 1 deletion query/query0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ func TestQueryVarValOrderError(t *testing.T) {
`
_, err := processQuery(t, context.Background(), query)
require.Error(t, err)
require.Contains(t, err.Error(), "Cannot sort attribute n of type object.")
require.Contains(t, err.Error(), "Cannot sort by unknown attribute n")
}

func TestQueryVarValOrderDesc(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions query/recurse.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
return ctx.Err()
}

if start.UnknownAttr {
return nil
}

// Add children back and expand if necessary
if exec, err = expandChildren(ctx, start, startChildren); err != nil {
return err
Expand Down Expand Up @@ -90,6 +94,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
}

for _, sg := range exec {
if sg.UnknownAttr {
continue
}

if len(sg.Filters) > 0 {
// We need to do this in case we had some filters.
sg.updateUidMatrix()
Expand Down Expand Up @@ -126,6 +134,9 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
var out []*SubGraph
var exp []*SubGraph
for _, sg := range exec {
if sg.UnknownAttr == true {
continue
}
if len(sg.DestUIDs.Uids) == 0 {
continue
}
Expand Down
4 changes: 4 additions & 0 deletions query/shortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (sg *SubGraph) expandOut(ctx context.Context,
rch <- ctx.Err()
return
default:
if subgraph.UnknownAttr {
continue
}

// Send the destuids in res chan.
for mIdx, fromUID := range subgraph.SrcUIDs.Uids {
for lIdx, toUID := range subgraph.uidMatrix[mIdx].Uids {
Expand Down
37 changes: 37 additions & 0 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,37 @@ func (g *groupi) BelongsTo(key string) uint32 {
return 0
}

// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
// the tablet for key if no group is currently serving it.
func (g *groupi) BelongsToReadOnly(key string) uint32 {
g.RLock()
tablet := g.tablets[key]
g.RUnlock()
if tablet != nil {
return tablet.GetGroupId()
}

// We don't know about this tablet. Talk to dgraphzero to find out who is
// serving this tablet.
pl := g.connToZeroLeader()
zc := pb.NewZeroClient(pl.Get())

tablet = &pb.Tablet{Predicate: key}
out, err := zc.Serves(context.Background(), tablet)
if err != nil {
glog.Errorf("Error while ShouldServe grpc call %v", err)
return 0
}
if out.GetGroupId() == 0 {
return 0
}

g.Lock()
defer g.Unlock()
g.tablets[key] = out
return out.GetGroupId()
}

func (g *groupi) ServesTablet(key string) bool {
tablet := g.Tablet(key)
if tablet != nil && tablet.GroupId == groups().groupId() {
Expand All @@ -331,6 +362,12 @@ func (g *groupi) ServesTablet(key string) bool {
return false
}

// ServesTabletReadOnly acts like ServesTablet except it does not ask zero to
// serve the tablet for key if no group is currently serving it.
func (g *groupi) ServesTabletReadOnly(key string) bool {
return g.BelongsToReadOnly(key) == groups().groupId()
}

// Do not modify the returned Tablet
// TODO: This should return error.
func (g *groupi) Tablet(key string) *pb.Tablet {
Expand Down
3 changes: 2 additions & 1 deletion worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (
)

var (
errUnservedTablet = x.Errorf("Tablet isn't being served by this instance.")
ErrUnservedTabletMessage = "Tablet isn't being served by this instance"
errUnservedTablet = x.Errorf(ErrUnservedTabletMessage)
)

func isStarAll(v []byte) bool {
Expand Down
2 changes: 1 addition & 1 deletion worker/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, erro
for _, attr := range predicates {
// This can happen after a predicate is moved. We don't delete predicate from schema state
// immediately. So lets ignore this predicate.
if !groups().ServesTablet(attr) {
if !groups().ServesTabletReadOnly(attr) {
continue
}
if schemaNode := populateSchema(attr, fields); schemaNode != nil {
Expand Down
6 changes: 5 additions & 1 deletion worker/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ type sortresult struct {

// SortOverNetwork sends sort query over the network.
func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, error) {
gid := groups().BelongsTo(q.Order[0].Attr)
gid := groups().BelongsToReadOnly(q.Order[0].Attr)
if gid == 0 {
return nil, fmt.Errorf("Cannot sort by unknown attribute %s", q.Order[0].Attr)
}

if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, "worker.SortOverNetwork. Attr: %s. Group: %d", q.Order[0].Attr, gid)
}
Expand Down
26 changes: 18 additions & 8 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,11 @@ func processWithBackupRequest(
// query.
func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error) {
attr := q.Attr
gid := groups().BelongsTo(attr)
gid := groups().BelongsToReadOnly(attr)
if gid == 0 {
return &pb.Result{}, errUnservedTablet
return &emptyResult, errUnservedTablet
}

span := otrace.FromContext(ctx)
if span != nil {
span.Annotatef(nil, "ProcessTaskOverNetwork. attr: %v gid: %v, readTs: %d, node id: %d",
Expand All @@ -157,6 +158,9 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error
return c.ServeTask(ctx, q)
})

if err == errUnservedTablet {
return &emptyResult, errUnservedTablet
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -723,11 +727,13 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro
}
span.Annotatef(nil, "Done waiting for checksum match")

// If a group stops serving tablet and it gets partitioned away from group zero, then it
// wouldn't know that this group is no longer serving this predicate.
// There's no issue if a we are serving a particular tablet and we get partitioned away from
// group zero as long as it's not removed.
if !groups().ServesTablet(q.Attr) {
// If a group stops serving tablet and it gets partitioned away from group
// zero, then it wouldn't know that this group is no longer serving this
// predicate. There's no issue if a we are serving a particular tablet and
// we get partitioned away from group zero as long as it's not removed.
// ServesTabletReadOnly is called instead of ServesTablet to prevent this
// alpha from requesting to serve this tablet.
if !groups().ServesTabletReadOnly(q.Attr) {
return &emptyResult, errUnservedTablet
}
qs := queryState{cache: posting.Oracle().CacheAt(q.ReadTs)}
Expand Down Expand Up @@ -1607,7 +1613,11 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er
return &emptyResult, ctx.Err()
}

gid := groups().BelongsTo(q.Attr)
gid := groups().BelongsToReadOnly(q.Attr)
if gid == 0 {
return &emptyResult, errUnservedTablet
}

var numUids int
if q.UidList != nil {
numUids = len(q.UidList.Uids)
Expand Down