Skip to content

Commit

Permalink
Prevent alphas from asking zero to serve tablets during queries. (hyp…
Browse files Browse the repository at this point in the history
…ermodeinc#3091)

This change makes queries read-only in the sense that they won't ask
zero to serve any predicates.
Same change that was reverted due to a bug detected by running "dgraph
increment". The bug has been fixed by asking zero what group is serving
a particular tablet without requesting to serve that tablet. This
preserves both correctness and the read-only property of queries.
  • Loading branch information
martinmr authored and dna2github committed Jul 19, 2019
1 parent 3a74a4b commit 1ff152a
Show file tree
Hide file tree
Showing 17 changed files with 434 additions and 264 deletions.
7 changes: 6 additions & 1 deletion dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (s *Server) ShouldServe(
if len(tablet.Predicate) == 0 {
return resp, x.Errorf("Tablet predicate is empty in %+v", tablet)
}
if tablet.GroupId == 0 {
if tablet.GroupId == 0 && !tablet.ReadOnly {
return resp, x.Errorf("Group ID is Zero in %+v", tablet)
}

Expand All @@ -548,6 +548,11 @@ func (s *Server) ShouldServe(
// serving.
return tab, nil
}
if tab == nil && tablet.ReadOnly {
// Read-only requests should return an empty tablet instead of asking zero to serve
// the predicate.
return &pb.Tablet{}, nil
}

// Set the tablet to be served by this server's group.
var proposal pb.ZeroProposal
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ message Tablet {
bool force = 3; // Used while moving predicate.
int64 space = 7;
bool remove = 8;
bool read_only = 9; // If true, do not ask zero to serve any tablets.
}

message DirectedEdge {
Expand Down
460 changes: 251 additions & 209 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type SubGraph struct {
ReadTs uint64
Cache int
Attr string
UnknownAttr bool
Params params
counts []uint32
valueMatrix []*pb.ValueList
Expand Down Expand Up @@ -2031,7 +2032,9 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)
if err != nil {
if err != nil && strings.Contains(err.Error(), worker.ErrUnservedTabletMessage) {
sg.UnknownAttr = true
} else if err != nil {
rch <- err
return
}
Expand Down
2 changes: 1 addition & 1 deletion query/query0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func TestQueryVarValOrderError(t *testing.T) {
`
_, err := processQuery(t, context.Background(), query)
require.Error(t, err)
require.Contains(t, err.Error(), "Cannot sort attribute n of type object.")
require.Contains(t, err.Error(), "Cannot sort by unknown attribute n")
}

func TestQueryVarValOrderDesc(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions query/recurse.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
return ctx.Err()
}

if start.UnknownAttr {
return nil
}

// Add children back and expand if necessary
if exec, err = expandChildren(ctx, start, startChildren); err != nil {
return err
Expand Down Expand Up @@ -90,6 +94,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
}

for _, sg := range exec {
if sg.UnknownAttr {
continue
}

if len(sg.Filters) > 0 {
// We need to do this in case we had some filters.
sg.updateUidMatrix()
Expand Down Expand Up @@ -126,6 +134,9 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
var out []*SubGraph
var exp []*SubGraph
for _, sg := range exec {
if sg.UnknownAttr == true {
continue
}
if len(sg.DestUIDs.Uids) == 0 {
continue
}
Expand Down
4 changes: 4 additions & 0 deletions query/shortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (sg *SubGraph) expandOut(ctx context.Context,
rch <- ctx.Err()
return
default:
if subgraph.UnknownAttr {
continue
}

// Send the destuids in res chan.
for mIdx, fromUID := range subgraph.SrcUIDs.Uids {
for lIdx, toUID := range subgraph.uidMatrix[mIdx].Uids {
Expand Down
6 changes: 4 additions & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}

if !groups().ServesTablet(s.Predicate) {
return fmt.Errorf("Group 1 should always serve reserved predicate %s",
if servesTablet, err := groups().ServesTablet(s.Predicate); err != nil {
return err
} else if !servesTablet {
return fmt.Errorf("group 1 should always serve reserved predicate %s",
s.Predicate)
}
}
Expand Down
2 changes: 1 addition & 1 deletion worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
if pk.Attr == "_predicate_" {
return false
}
if !groups().ServesTablet(pk.Attr) {
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false
}
// We need to ensure that schema keys are separately identifiable, so they can be
Expand Down
78 changes: 63 additions & 15 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func (g *groupi) informZeroAboutTablets() {
failed := false
preds := schema.State().Predicates()
for _, pred := range preds {
if tablet := g.Tablet(pred); tablet == nil {
if tablet, err := g.Tablet(pred); err != nil {
failed = true
glog.Errorf("Error while getting tablet for pred %q: %v", pred, err)
} else if tablet == nil {
failed = true
}
}
Expand Down Expand Up @@ -308,31 +311,76 @@ func (g *groupi) ChecksumsMatch(ctx context.Context) error {
}
}

func (g *groupi) BelongsTo(key string) uint32 {
tablet := g.Tablet(key)
func (g *groupi) BelongsTo(key string) (uint32, error) {
if tablet, err := g.Tablet(key); err != nil {
return 0, err
} else if tablet != nil {
return tablet.GroupId, nil
}
return 0, nil
}

// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
// the tablet for key if no group is currently serving it.
func (g *groupi) BelongsToReadOnly(key string) (uint32, error) {
g.RLock()
tablet := g.tablets[key]
g.RUnlock()
if tablet != nil {
return tablet.GroupId
return tablet.GetGroupId(), nil
}
return 0

// We don't know about this tablet. Talk to dgraphzero to find out who is
// serving this tablet.
pl := g.connToZeroLeader()
zc := pb.NewZeroClient(pl.Get())

tablet = &pb.Tablet{
Predicate: key,
ReadOnly: true,
}
out, err := zc.ShouldServe(context.Background(), tablet)
if err != nil {
glog.Errorf("Error while ShouldServe grpc call %v", err)
return 0, err
}
if out.GetGroupId() == 0 {
return 0, nil
}

g.Lock()
defer g.Unlock()
g.tablets[key] = out
return out.GetGroupId(), nil
}

func (g *groupi) ServesTablet(key string) (bool, error) {
if tablet, err := g.Tablet(key); err != nil {
return false, err
} else if tablet != nil && tablet.GroupId == groups().groupId() {
return true, nil
}
return false, nil
}

func (g *groupi) ServesTablet(key string) bool {
tablet := g.Tablet(key)
if tablet != nil && tablet.GroupId == groups().groupId() {
return true
// ServesTabletReadOnly acts like ServesTablet except it does not ask zero to
// serve the tablet for key if no group is currently serving it.
func (g *groupi) ServesTabletReadOnly(key string) (bool, error) {
gid, err := g.BelongsToReadOnly(key)
if err != nil {
return false, err
}
return false
return gid == groups().groupId(), nil
}

// Do not modify the returned Tablet
// TODO: This should return error.
func (g *groupi) Tablet(key string) *pb.Tablet {
func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
// TODO: Remove all this later, create a membership state and apply it
g.RLock()
tablet, ok := g.tablets[key]
g.RUnlock()
if ok {
return tablet
return tablet, nil
}

// We don't know about this tablet.
Expand All @@ -344,7 +392,7 @@ func (g *groupi) Tablet(key string) *pb.Tablet {
out, err := zc.ShouldServe(context.Background(), tablet)
if err != nil {
glog.Errorf("Error while ShouldServe grpc call %v", err)
return nil
return nil, err
}
g.Lock()
g.tablets[key] = out
Expand All @@ -353,7 +401,7 @@ func (g *groupi) Tablet(key string) *pb.Tablet {
if out.GroupId == groups().groupId() {
glog.Infof("Serving tablet for: %v\n", key)
}
return out
return out, nil
}

func (g *groupi) HasMeInState() bool {
Expand Down
30 changes: 22 additions & 8 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (
)

var (
errUnservedTablet = x.Errorf("Tablet isn't being served by this instance.")
ErrUnservedTabletMessage = "Tablet isn't being served by this instance"
errUnservedTablet = x.Errorf(ErrUnservedTabletMessage)
)

func isStarAll(v []byte) bool {
Expand Down Expand Up @@ -111,10 +112,12 @@ func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uin
}

func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error {
if !groups().ServesTablet(update.Predicate) {
tablet := groups().Tablet(update.Predicate)
if tablet, err := groups().Tablet(update.Predicate); err != nil {
return err
} else if tablet.GetGroupId() != groups().groupId() {
return x.Errorf("Tablet isn't being served by this group. Tablet: %+v", tablet)
}

if err := checkSchema(update); err != nil {
return err
}
Expand Down Expand Up @@ -442,10 +445,14 @@ func proposeOrSend(ctx context.Context, gid uint32, m *pb.Mutations, chr chan re

// populateMutationMap populates a map from group id to the mutation that
// should be sent to that group.
func populateMutationMap(src *pb.Mutations) map[uint32]*pb.Mutations {
func populateMutationMap(src *pb.Mutations) (map[uint32]*pb.Mutations, error) {
mm := make(map[uint32]*pb.Mutations)
for _, edge := range src.Edges {
gid := groups().BelongsTo(edge.Attr)
gid, err := groups().BelongsTo(edge.Attr)
if err != nil {
return nil, err
}

mu := mm[gid]
if mu == nil {
mu = &pb.Mutations{GroupId: gid}
Expand All @@ -455,7 +462,11 @@ func populateMutationMap(src *pb.Mutations) map[uint32]*pb.Mutations {
}

for _, schema := range src.Schema {
gid := groups().BelongsTo(schema.Predicate)
gid, err := groups().BelongsTo(schema.Predicate)
if err != nil {
return nil, err
}

mu := mm[gid]
if mu == nil {
mu = &pb.Mutations{GroupId: gid}
Expand Down Expand Up @@ -487,7 +498,7 @@ func populateMutationMap(src *pb.Mutations) map[uint32]*pb.Mutations {
}
}

return mm
return mm, nil
}

func commitOrAbort(ctx context.Context, startTs, commitTs uint64) error {
Expand All @@ -511,7 +522,10 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e
defer span.End()

tctx := &api.TxnContext{StartTs: m.StartTs}
mutationMap := populateMutationMap(m)
mutationMap, err := populateMutationMap(m)
if err != nil {
return tctx, err
}

resCh := make(chan res, len(mutationMap))
for gid, mu := range mutationMap {
Expand Down
3 changes: 2 additions & 1 deletion worker/mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func TestPopulateMutationMap(t *testing.T) {
}}
m := &pb.Mutations{Edges: edges, Schema: schema}

mutationsMap := populateMutationMap(m)
mutationsMap, err := populateMutationMap(m)
require.NoError(t, err)
mu := mutationsMap[1]
require.NotNil(t, mu)
require.NotNil(t, mu.Edges)
Expand Down
4 changes: 3 additions & 1 deletion worker/predicate_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (w *grpcWorker) MovePredicate(ctx context.Context,
if err := posting.Oracle().WaitForTs(ctx, in.TxnTs); err != nil {
return &emptyPayload, x.Errorf("While waiting for txn ts: %d. Error: %v", in.TxnTs, err)
}
if !groups().ServesTablet(in.Predicate) {
if servesTablet, err := groups().ServesTablet(in.Predicate); err != nil {
return &emptyPayload, err
} else if !servesTablet {
return &emptyPayload, errUnservedTablet
}

Expand Down
5 changes: 3 additions & 2 deletions worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
var noTimeout bool

checkTablet := func(pred string) error {
if tablet := groups().Tablet(pred); tablet == nil ||
tablet.GroupId != groups().groupId() {
if tablet, err := groups().Tablet(pred); err != nil {
return err
} else if tablet == nil || tablet.GroupId != groups().groupId() {
return errUnservedTablet
}
return nil
Expand Down
Loading

0 comments on commit 1ff152a

Please sign in to comment.