Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent alphas from asking zero to serve tablets during queries. #3091

Merged
merged 10 commits into from
Mar 15, 2019
7 changes: 6 additions & 1 deletion dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (s *Server) ShouldServe(
if len(tablet.Predicate) == 0 {
return resp, x.Errorf("Tablet predicate is empty in %+v", tablet)
}
if tablet.GroupId == 0 {
if tablet.GroupId == 0 && !tablet.ReadOnly {
return resp, x.Errorf("Group ID is Zero in %+v", tablet)
}

Expand All @@ -548,6 +548,11 @@ func (s *Server) ShouldServe(
// serving.
return tab, nil
}
if tab == nil && tablet.ReadOnly {
// Read-only requests should return an empty tablet instead of asking zero to serve
// the predicate.
return &pb.Tablet{}, nil
}

// Set the tablet to be served by this server's group.
var proposal pb.ZeroProposal
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ message Tablet {
bool force = 3; // Used while moving predicate.
int64 space = 7;
bool remove = 8;
bool read_only = 9; // If true, do not ask zero to serve any tablets.
}

message DirectedEdge {
Expand Down
458 changes: 250 additions & 208 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Function struct {
type SubGraph struct {
ReadTs uint64
Attr string
UnknownAttr bool
Params params
counts []uint32
valueMatrix []*pb.ValueList
Expand Down Expand Up @@ -2025,7 +2026,9 @@ func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) {
return
}
result, err := worker.ProcessTaskOverNetwork(ctx, taskQuery)
if err != nil {
if err != nil && strings.Contains(err.Error(), worker.ErrUnservedTabletMessage) {
sg.UnknownAttr = true
} else if err != nil {
rch <- err
return
}
Expand Down
2 changes: 1 addition & 1 deletion query/query0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ func TestQueryVarValOrderError(t *testing.T) {
`
_, err := processQuery(t, context.Background(), query)
require.Error(t, err)
require.Contains(t, err.Error(), "Cannot sort attribute n of type object.")
require.Contains(t, err.Error(), "Cannot sort by unknown attribute n")
}

func TestQueryVarValOrderDesc(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions query/recurse.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
return ctx.Err()
}

if start.UnknownAttr {
return nil
}

// Add children back and expand if necessary
if exec, err = expandChildren(ctx, start, startChildren); err != nil {
return err
Expand Down Expand Up @@ -90,6 +94,10 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
}

for _, sg := range exec {
if sg.UnknownAttr {
continue
}

if len(sg.Filters) > 0 {
// We need to do this in case we had some filters.
sg.updateUidMatrix()
Expand Down Expand Up @@ -126,6 +134,9 @@ func (start *SubGraph) expandRecurse(ctx context.Context, maxDepth uint64) error
var out []*SubGraph
var exp []*SubGraph
for _, sg := range exec {
if sg.UnknownAttr == true {
continue
}
if len(sg.DestUIDs.Uids) == 0 {
continue
}
Expand Down
4 changes: 4 additions & 0 deletions query/shortest.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (sg *SubGraph) expandOut(ctx context.Context,
rch <- ctx.Err()
return
default:
if subgraph.UnknownAttr {
continue
}

// Send the destuids in res chan.
for mIdx, fromUID := range subgraph.SrcUIDs.Uids {
for lIdx, toUID := range subgraph.uidMatrix[mIdx].Uids {
Expand Down
6 changes: 4 additions & 2 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}

if !groups().ServesTablet(s.Predicate) {
return fmt.Errorf("Group 1 should always serve reserved predicate %s",
if servesTablet, err := groups().ServesTablet(s.Predicate); err != nil {
return err
} else if !servesTablet {
return fmt.Errorf("group 1 should always serve reserved predicate %s",
s.Predicate)
}
}
Expand Down
2 changes: 1 addition & 1 deletion worker/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func export(ctx context.Context, in *pb.ExportRequest) error {
if pk.Attr == "_predicate_" {
return false
}
if !groups().ServesTablet(pk.Attr) {
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false
}
// We need to ensure that schema keys are separately identifiable, so they can be
Expand Down
78 changes: 63 additions & 15 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func (g *groupi) informZeroAboutTablets() {
failed := false
preds := schema.State().Predicates()
for _, pred := range preds {
if tablet := g.Tablet(pred); tablet == nil {
if tablet, err := g.Tablet(pred); err != nil {
failed = true
glog.V(1).Infof("Error while getting tablet for pred %s: %v", pred, err)
} else if tablet == nil {
failed = true
}
}
Expand Down Expand Up @@ -315,31 +318,76 @@ func (g *groupi) ChecksumsMatch(ctx context.Context) error {
}
}

func (g *groupi) BelongsTo(key string) uint32 {
tablet := g.Tablet(key)
func (g *groupi) BelongsTo(key string) (uint32, error) {
if tablet, err := g.Tablet(key); err != nil {
return 0, err
} else if tablet != nil {
return tablet.GroupId, nil
}
return 0, nil
}

// BelongsToReadOnly acts like BelongsTo except it does not ask zero to serve
// the tablet for key if no group is currently serving it.
func (g *groupi) BelongsToReadOnly(key string) (uint32, error) {
g.RLock()
tablet := g.tablets[key]
g.RUnlock()
if tablet != nil {
return tablet.GroupId
return tablet.GetGroupId(), nil
}
return 0

// We don't know about this tablet. Talk to dgraphzero to find out who is
// serving this tablet.
pl := g.connToZeroLeader()
zc := pb.NewZeroClient(pl.Get())

tablet = &pb.Tablet{
Predicate: key,
ReadOnly: true,
}
out, err := zc.ShouldServe(context.Background(), tablet)
if err != nil {
glog.Errorf("Error while ShouldServe grpc call %v", err)
return 0, err
}
if out.GetGroupId() == 0 {
return 0, nil
}

g.Lock()
defer g.Unlock()
g.tablets[key] = out
return out.GetGroupId(), nil
}

func (g *groupi) ServesTablet(key string) (bool, error) {
if tablet, err := g.Tablet(key); err != nil {
return false, err
} else if tablet != nil && tablet.GroupId == groups().groupId() {
return true, nil
}
return false, nil
}

func (g *groupi) ServesTablet(key string) bool {
tablet := g.Tablet(key)
if tablet != nil && tablet.GroupId == groups().groupId() {
return true
// ServesTabletReadOnly acts like ServesTablet except it does not ask zero to
// serve the tablet for key if no group is currently serving it.
func (g *groupi) ServesTabletReadOnly(key string) (bool, error) {
gid, err := g.BelongsToReadOnly(key)
if err != nil {
return false, err
}
return false
return gid == groups().groupId(), nil
}

// Do not modify the returned Tablet
// TODO: This should return error.
func (g *groupi) Tablet(key string) *pb.Tablet {
func (g *groupi) Tablet(key string) (*pb.Tablet, error) {
// TODO: Remove all this later, create a membership state and apply it
g.RLock()
tablet, ok := g.tablets[key]
g.RUnlock()
if ok {
return tablet
return tablet, nil
}

// We don't know about this tablet.
Expand All @@ -351,7 +399,7 @@ func (g *groupi) Tablet(key string) *pb.Tablet {
out, err := zc.ShouldServe(context.Background(), tablet)
if err != nil {
glog.Errorf("Error while ShouldServe grpc call %v", err)
return nil
return nil, err
}
g.Lock()
g.tablets[key] = out
Expand All @@ -360,7 +408,7 @@ func (g *groupi) Tablet(key string) *pb.Tablet {
if out.GroupId == groups().groupId() {
glog.Infof("Serving tablet for: %v\n", key)
}
return out
return out, nil
}

func (g *groupi) HasMeInState() bool {
Expand Down
30 changes: 22 additions & 8 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (
)

var (
errUnservedTablet = x.Errorf("Tablet isn't being served by this instance.")
ErrUnservedTabletMessage = "Tablet isn't being served by this instance"
errUnservedTablet = x.Errorf(ErrUnservedTabletMessage)
)

func isStarAll(v []byte) bool {
Expand Down Expand Up @@ -111,10 +112,12 @@ func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uin
}

func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error {
if !groups().ServesTablet(update.Predicate) {
tablet := groups().Tablet(update.Predicate)
if tablet, err := groups().Tablet(update.Predicate); err != nil {
return err
} else if tablet.GetGroupId() != groups().groupId() {
return x.Errorf("Tablet isn't being served by this group. Tablet: %+v", tablet)
}

if err := checkSchema(update); err != nil {
return err
}
Expand Down Expand Up @@ -393,10 +396,14 @@ func proposeOrSend(ctx context.Context, gid uint32, m *pb.Mutations, chr chan re

// populateMutationMap populates a map from group id to the mutation that
// should be sent to that group.
func populateMutationMap(src *pb.Mutations) map[uint32]*pb.Mutations {
func populateMutationMap(src *pb.Mutations) (map[uint32]*pb.Mutations, error) {
mm := make(map[uint32]*pb.Mutations)
for _, edge := range src.Edges {
gid := groups().BelongsTo(edge.Attr)
gid, err := groups().BelongsTo(edge.Attr)
if err != nil {
return nil, err
}

mu := mm[gid]
if mu == nil {
mu = &pb.Mutations{GroupId: gid}
Expand All @@ -405,7 +412,11 @@ func populateMutationMap(src *pb.Mutations) map[uint32]*pb.Mutations {
mu.Edges = append(mu.Edges, edge)
}
for _, schema := range src.Schema {
gid := groups().BelongsTo(schema.Predicate)
gid, err := groups().BelongsTo(schema.Predicate)
if err != nil {
return nil, err
}

mu := mm[gid]
if mu == nil {
mu = &pb.Mutations{GroupId: gid}
Expand All @@ -423,7 +434,7 @@ func populateMutationMap(src *pb.Mutations) map[uint32]*pb.Mutations {
mu.DropAll = true
}
}
return mm
return mm, nil
}

func commitOrAbort(ctx context.Context, startTs, commitTs uint64) error {
Expand All @@ -447,7 +458,10 @@ func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, e
defer span.End()

tctx := &api.TxnContext{StartTs: m.StartTs}
mutationMap := populateMutationMap(m)
mutationMap, err := populateMutationMap(m)
if err != nil {
return tctx, err
}

resCh := make(chan res, len(mutationMap))
for gid, mu := range mutationMap {
Expand Down
4 changes: 3 additions & 1 deletion worker/predicate_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (w *grpcWorker) MovePredicate(ctx context.Context,
if err := posting.Oracle().WaitForTs(ctx, in.TxnTs); err != nil {
return &emptyPayload, x.Errorf("While waiting for txn ts: %d. Error: %v", in.TxnTs, err)
}
if !groups().ServesTablet(in.Predicate) {
if servesTablet, err := groups().ServesTablet(in.Predicate); err != nil {
return &emptyPayload, err
} else if !servesTablet {
return &emptyPayload, errUnservedTablet
}

Expand Down
5 changes: 3 additions & 2 deletions worker/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
var noTimeout bool

checkTablet := func(pred string) error {
if tablet := groups().Tablet(pred); tablet == nil ||
tablet.GroupId != groups().groupId() {
if tablet, err := groups().Tablet(pred); err != nil {
return err
} else if tablet == nil || tablet.GroupId != groups().groupId() {
return errUnservedTablet
}
return nil
Expand Down
20 changes: 15 additions & 5 deletions worker/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ func getSchema(ctx context.Context, s *pb.SchemaRequest) (*pb.SchemaResult, erro
for _, attr := range predicates {
// This can happen after a predicate is moved. We don't delete predicate from schema state
// immediately. So lets ignore this predicate.
if !groups().ServesTablet(attr) {
if servesTablet, err := groups().ServesTabletReadOnly(attr); err != nil {
return nil, err
} else if !servesTablet {
continue
}

if schemaNode := populateSchema(attr, fields); schemaNode != nil {
result.Schema = append(result.Schema, schemaNode)
}
Expand Down Expand Up @@ -111,9 +114,13 @@ func populateSchema(attr string, fields []string) *api.SchemaNode {

// addToSchemaMap groups the predicates by group id, if list of predicates is
// empty then it adds all known groups
func addToSchemaMap(schemaMap map[uint32]*pb.SchemaRequest, schema *pb.SchemaRequest) {
func addToSchemaMap(schemaMap map[uint32]*pb.SchemaRequest, schema *pb.SchemaRequest) error {
for _, attr := range schema.Predicates {
gid := groups().BelongsTo(attr)
gid, err := groups().BelongsTo(attr)
if err != nil {
return err
}

s := schemaMap[gid]
if s == nil {
s = &pb.SchemaRequest{GroupId: gid}
Expand All @@ -123,7 +130,7 @@ func addToSchemaMap(schemaMap map[uint32]*pb.SchemaRequest, schema *pb.SchemaReq
s.Predicates = append(s.Predicates, attr)
}
if len(schema.Predicates) > 0 {
return
return nil
}
// TODO: Janardhan - node shouldn't serve any request until membership
// information is synced, should we fail health check till then ?
Expand All @@ -139,6 +146,7 @@ func addToSchemaMap(schemaMap map[uint32]*pb.SchemaRequest, schema *pb.SchemaReq
schemaMap[gid] = s
}
}
return nil
}

// If the current node serves the group serve the schema or forward
Expand Down Expand Up @@ -174,7 +182,9 @@ func GetSchemaOverNetwork(ctx context.Context, schema *pb.SchemaRequest) ([]*api

// Map of groupd id => Predicates for that group.
schemaMap := make(map[uint32]*pb.SchemaRequest)
addToSchemaMap(schemaMap, schema)
if err := addToSchemaMap(schemaMap, schema); err != nil {
return nil, err
}

results := make(chan resultErr, len(schemaMap))
var schemaNodes []*api.SchemaNode
Expand Down
Loading