Skip to content

Commit

Permalink
Prevent alphas from asking zero to serve tablets during queries. (#3055)
Browse files Browse the repository at this point in the history
Currently ServesTablet and BelongsTo ask zero to serve the tablet if no
predicate is currently serving it. This behavior should not happen
during queries as that means that queries can modify the state of the
alphas.

This change introduces read-only versions of those two methods to be
used during queries.

I manually verified that tablets for non-existent predicates no longer
ask zero to serve the tablet.
  • Loading branch information
martinmr authored Feb 27, 2019
1 parent ddb0d76 commit 1bde8d5
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 14 deletions.
5 changes: 4 additions & 1 deletion query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Function struct {
type SubGraph struct {
ReadTs uint64
Attr string
UnknownAttr bool
Params params
counts []uint32
valueMatrix []*pb.ValueList
Expand Down Expand Up @@ -2025,7 +2026,9 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)
if err != nil {
if err != nil && strings.Contains(err.Error(), worker.ErrUnservedTabletMessage) {
sg.UnknownAttr = true
} else if err != nil {
rch <- err
return
}
Expand Down
2 changes: 1 addition & 1 deletion query/query0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ func TestQueryVarValOrderError(t *testing.T) {
`
_, err := processQuery(t, context.Background(), query)
require.Error(t, err)
require.Contains(t, err.Error(), "Cannot sort attribute n of type object.")
require.Contains(t, err.Error(), "Cannot sort by unknown attribute n")
}

func TestQueryVarValOrderDesc(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions query/recurse.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
return ctx.Err()
}

if start.UnknownAttr {
return nil
}

// Add children back and expand if necessary
if exec, err = expandChildren(ctx, start, startChildren); err != nil {
return err
Expand Down Expand Up @@ -90,6 +94,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
}

for _, sg := range exec {
if sg.UnknownAttr {
continue
}

if len(sg.Filters) > 0 {
// We need to do this in case we had some filters.
sg.updateUidMatrix()
Expand Down Expand Up @@ -126,6 +134,9 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
var out []*SubGraph
var exp []*SubGraph
for _, sg := range exec {
if sg.UnknownAttr == true {
continue
}
if len(sg.DestUIDs.Uids) == 0 {
continue
}
Expand Down
4 changes: 4 additions & 0 deletions query/shortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (sg *SubGraph) expandOut(ctx context.Context,
rch <- ctx.Err()
return
default:
if subgraph.UnknownAttr {
continue
}

// Send the destuids in res chan.
for mIdx, fromUID := range subgraph.SrcUIDs.Uids {
for lIdx, toUID := range subgraph.uidMatrix[mIdx].Uids {
Expand Down
15 changes: 15 additions & 0 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,15 @@ func (g *groupi) BelongsTo(key string) uint32 {
return 0
}

// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
// the tablet for key if no group is currently serving it.
func (g *groupi) BelongsToReadOnly(key string) uint32 {
g.RLock()
defer g.RUnlock()
tablet := g.tablets[key]
return tablet.GetGroupId()
}

func (g *groupi) ServesTablet(key string) bool {
tablet := g.Tablet(key)
if tablet != nil && tablet.GroupId == groups().groupId() {
Expand All @@ -330,6 +339,12 @@ func (g *groupi) ServesTablet(key string) bool {
return false
}

// ServesTabletReadOnly acts like ServesTablet except it does not ask zero to
// serve the tablet for key if no group is currently serving it.
func (g *groupi) ServesTabletReadOnly(key string) bool {
return g.BelongsToReadOnly(key) == groups().groupId()
}

// Do not modify the returned Tablet
// TODO: This should return error.
func (g *groupi) Tablet(key string) *pb.Tablet {
Expand Down
4 changes: 2 additions & 2 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
)

var (
errUnservedTablet = x.Errorf("Tablet isn't being served by this instance.")
errPredicateMoving = x.Errorf("Predicate is being moved. Please retry later")
ErrUnservedTabletMessage = "Tablet isn't being served by this instance"
errUnservedTablet = x.Errorf(ErrUnservedTabletMessage)
)

func isStarAll(v []byte) bool {
Expand Down
2 changes: 1 addition & 1 deletion worker/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, erro
for _, attr := range predicates {
// This can happen after a predicate is moved. We don't delete predicate from schema state
// immediately. So lets ignore this predicate.
if !groups().ServesTablet(attr) {
if !groups().ServesTabletReadOnly(attr) {
continue
}
if schemaNode := populateSchema(attr, fields); schemaNode != nil {
Expand Down
6 changes: 5 additions & 1 deletion worker/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ type sortresult struct {

// SortOverNetwork sends sort query over the network.
func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, error) {
gid := groups().BelongsTo(q.Order[0].Attr)
gid := groups().BelongsToReadOnly(q.Order[0].Attr)
if gid == 0 {
return nil, fmt.Errorf("Cannot sort by unknown attribute %s", q.Order[0].Attr)
}

if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, "worker.SortOverNetwork. Attr: %s. Group: %d", q.Order[0].Attr, gid)
}
Expand Down
26 changes: 18 additions & 8 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,11 @@ func processWithBackupRequest(
// query.
func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error) {
attr := q.Attr
gid := groups().BelongsTo(attr)
gid := groups().BelongsToReadOnly(attr)
if gid == 0 {
return &pb.Result{}, errUnservedTablet
return &emptyResult, errUnservedTablet
}

span := otrace.FromContext(ctx)
if span != nil {
span.Annotatef(nil, "ProcessTaskOverNetwork. attr: %v gid: %v, readTs: %d, node id: %d",
Expand All @@ -157,6 +158,9 @@ func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error
return c.ServeTask(ctx, q)
})

if err == errUnservedTablet {
return &emptyResult, errUnservedTablet
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -722,11 +726,13 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro
q.Attr, q.ReadTs, maxAssigned)
}

// If a group stops serving tablet and it gets partitioned away from group zero, then it
// wouldn't know that this group is no longer serving this predicate.
// There's no issue if a we are serving a particular tablet and we get partitioned away from
// group zero as long as it's not removed.
if !groups().ServesTablet(q.Attr) {
// If a group stops serving tablet and it gets partitioned away from group
// zero, then it wouldn't know that this group is no longer serving this
// predicate. There's no issue if a we are serving a particular tablet and
// we get partitioned away from group zero as long as it's not removed.
// ServesTabletReadOnly is called instead of ServesTablet to prevent this
// alpha from requesting to serve this tablet.
if !groups().ServesTabletReadOnly(q.Attr) {
return &emptyResult, errUnservedTablet
}
qs := queryState{cache: posting.Oracle().CacheAt(q.ReadTs)}
Expand Down Expand Up @@ -1606,7 +1612,11 @@ func (w *grpcWorker) ServeTask(ctx context.Context, q *pb.Query) (*pb.Result, er
return &emptyResult, ctx.Err()
}

gid := groups().BelongsTo(q.Attr)
gid := groups().BelongsToReadOnly(q.Attr)
if gid == 0 {
return &emptyResult, errUnservedTablet
}

var numUids int
if q.UidList != nil {
numUids = len(q.UidList.Uids)
Expand Down

0 comments on commit 1bde8d5

Please sign in to comment.