From 3eebed57d8990634a26e4840125bfcc2986b6113 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Thu, 15 Nov 2018 13:56:51 -0800 Subject: [PATCH 01/12] Trial: Do not purge anything --- dgraph/cmd/zero/oracle.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index d71fff1139f..d655caecc29 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -84,6 +84,10 @@ func (o *Oracle) purgeBelow(minTs uint64) { o.Lock() defer o.Unlock() + // TODO: HACK. Remove this later. + glog.Infof("Not purging below: %d", minTs) + return + // Dropping would be cheaper if abort/commits map is sharded for ts := range o.commits { if ts < minTs { From a9d947641fa8ca62421a62062b79a38c7473ac63 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 16 Nov 2018 19:56:32 -0800 Subject: [PATCH 02/12] More debugging, more tracing for Jepsen. --- dgraph/cmd/alpha/run.go | 3 +++ dgraph/cmd/debug/run.go | 56 +++++++++++++++++++++++++++++++++-------- edgraph/server.go | 23 ++++++++++++++++- worker/groups.go | 2 +- 4 files changed, 71 insertions(+), 13 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index b604998fd86..61b60bfa8ec 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -431,6 +431,9 @@ func run() { } } + // TODO: Set this to the trace sampling rate set in flags. + otrace.ApplyConfig(otrace.Config{DefaultSampler: otrace.AlwaysSample()}) + // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). schema.Init(edgraph.State.Pstore) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 79fb7e9c83c..be17e0fd95f 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -70,6 +70,19 @@ func init() { flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.") } +func toInt(o *pb.Posting) int { + from := types.Val{ + Tid: types.TypeID(o.ValType), + Value: o.Value, + } + out, err := types.Convert(from, types.StringID) + x.Check(err) + val := out.Value.(string) + a, err := strconv.Atoi(val) + x.Check(err) + return a +} + func readAmount(txn *badger.Txn, uid uint64) int { iopt := badger.DefaultIteratorOptions iopt.AllVersions = true @@ -86,16 +99,7 @@ func readAmount(txn *badger.Txn, uid uint64) int { var times int var amount int err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error { - from := types.Val{ - Tid: types.TypeID(o.ValType), - Value: o.Value, - } - out, err := types.Convert(from, types.StringID) - x.Check(err) - val := out.Value.(string) - a, err := strconv.Atoi(val) - x.Check(err) - amount = a + amount = toInt(o) times++ return nil }) @@ -192,6 +196,12 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) { itr := txn.NewIterator(badger.DefaultIteratorOptions) defer itr.Close() + type account struct { + Key int + Amt int + } + keys := make(map[uint64]*account) + var buf bytes.Buffer fmt.Fprintf(&buf, "SHOWING all postings at %d\n", readTs) for itr.Rewind(); itr.Valid(); itr.Next() { @@ -204,15 +214,39 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) { if !pk.IsData() || pk.Attr == "_predicate_" { continue } - fmt.Fprintf(&buf, "\nkey: %+v hex: %x\n", pk, item.Key()) + + var acc *account + if pk.Attr == "key_0" || pk.Attr == "amount_0" { + var has bool + acc, has = keys[pk.Uid] + if !has { + acc = &account{} + keys[pk.Uid] = acc + } + } + fmt.Fprintf(&buf, " key: %+v hex: %x\n", pk, item.Key()) val, err := item.ValueCopy(nil) x.Check(err) var plist pb.PostingList x.Check(plist.Unmarshal(val)) + x.AssertTrue(len(plist.Postings) <= 1) + var num int for _, p := range plist.Postings { + num = toInt(p) appendPosting(&buf, p) } + if num > 0 && acc != nil { + switch pk.Attr { + case "key_0": + acc.Key = num + case "amount_0": + acc.Amt = num + } + } + } + for uid, acc := range keys { + fmt.Fprintf(&buf, "Uid: %d Key: %d Amount: %d\n", uid, acc.Key, acc.Amt) } fmt.Println(buf.String()) } diff --git a/edgraph/server.go b/edgraph/server.go index 1f1fd6f8cc9..87770c0fa54 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -66,6 +66,8 @@ type ServerState struct { // TODO(tzdybal) - remove global var State ServerState +// var startOpt = otrace.WithSampler(otrace.AlwaysSample()) + func InitServerState() { Config.validate() @@ -338,6 +340,12 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign return resp, err } + if len(mu.SetJson) > 0 { + span.Annotatef(nil, "Got JSON Mutation: %s", mu.SetJson) + } else if len(mu.SetNquads) > 0 { + span.Annotatef(nil, "Got NQuad Mutation: %s", mu.SetNquads) + } + if !isMutationAllowed(ctx) { return nil, x.Errorf("No mutations allowed.") } @@ -383,8 +391,9 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign Edges: edges, StartTs: mu.StartTs, } - span.Annotate(nil, "Applying mutations") + span.Annotatef(nil, "Applying mutations: %+v", m) resp.Context, err = query.ApplyMutations(ctx, m) + span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Context, err) if !mu.CommitNow { if err == y.ErrConflict { err = status.Error(codes.FailedPrecondition, err.Error()) @@ -494,6 +503,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons return resp, err } resp.Json = json + span.Annotatef(nil, "Response = %s", json) gl := &api.Latency{ ParsingNs: uint64(l.Parsing.Nanoseconds()), @@ -520,6 +530,17 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx if tc.StartTs == 0 { return &api.TxnContext{}, fmt.Errorf("StartTs cannot be zero while committing a transaction.") } + + span.Annotatef(nil, "Txn Context received: %+v", tc) + var attrs []otrace.Attribute + for _, key := range tc.Keys { + attrs = append(attrs, otrace.StringAttribute("conflict", key)) + } + for _, pred := range tc.Preds { + attrs = append(attrs, otrace.StringAttribute("pred", pred)) + } + span.Annotate(attrs, "Conflict Keys and Preds") + annotateStartTs(span, tc.StartTs) commitTs, err := worker.CommitOverNetwork(ctx, tc) if err == y.ErrAborted { diff --git a/worker/groups.go b/worker/groups.go index 7937aae18f2..19b2c553c22 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -851,7 +851,7 @@ func (g *groupi) processOracleDeltaStream() { return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs }) elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) - if glog.V(3) { + if glog.V(2) { glog.Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) } for { From 3fc12ce4596e8bd7da586101892929a3e5bf0270 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Sun, 18 Nov 2018 15:16:05 -0800 Subject: [PATCH 03/12] Opencensus in Zero. --- dgraph/cmd/alpha/run.go | 2 +- dgraph/cmd/debug/run.go | 6 ++++-- dgraph/cmd/zero/assign.go | 1 + dgraph/cmd/zero/oracle.go | 14 ++++++++++++++ dgraph/cmd/zero/raft.go | 2 ++ dgraph/cmd/zero/run.go | 33 ++++++++++++++++++++++++++++++++- edgraph/server.go | 14 ++++++++++---- worker/groups.go | 9 ++++++++- 8 files changed, 72 insertions(+), 9 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 61b60bfa8ec..161b58a4d91 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -247,7 +247,7 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) { } handler := &ocgrpc.ServerHandler{ - IsPublicEndpoint: true, + IsPublicEndpoint: false, StartOptions: otrace.StartOptions{ Sampler: otrace.AlwaysSample(), }, diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index be17e0fd95f..31e9cbcd568 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -79,7 +79,9 @@ func toInt(o *pb.Posting) int { x.Check(err) val := out.Value.(string) a, err := strconv.Atoi(val) - x.Check(err) + if err != nil { + return 0 + } return a } @@ -246,7 +248,7 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) { } } for uid, acc := range keys { - fmt.Fprintf(&buf, "Uid: %d Key: %d Amount: %d\n", uid, acc.Key, acc.Amt) + fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt) } fmt.Println(buf.String()) } diff --git a/dgraph/cmd/zero/assign.go b/dgraph/cmd/zero/assign.go index 40a4c691731..e2360a151ca 100644 --- a/dgraph/cmd/zero/assign.go +++ b/dgraph/cmd/zero/assign.go @@ -41,6 +41,7 @@ func (s *Server) updateLeases() { s.nextLeaseId = s.state.MaxLeaseId + 1 s.nextTxnTs = s.state.MaxTxnTs + 1 startTs = s.nextTxnTs + glog.Infof("Updated Lease id: %d. Txn Ts: %d", s.nextLeaseId, s.nextTxnTs) s.Unlock() s.orc.updateStartTxnTs(startTs) } diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index d655caecc29..daac632fe38 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -26,6 +26,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + otrace "go.opencensus.io/trace" "golang.org/x/net/context" ) @@ -67,6 +68,7 @@ func (o *Oracle) updateStartTxnTs(ts uint64) { o.keyCommit = make(map[string]uint64) } +// TODO: This should be done during proposal application for Txn status. func (o *Oracle) hasConflict(src *api.TxnContext) bool { // This transaction was started before I became leader. if src.StartTs < o.startTxnTs { @@ -296,6 +298,8 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { } func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { + span := otrace.FromContext(ctx) + span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "") if src.Aborted { return s.proposeTxn(ctx, src) } @@ -305,6 +309,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { conflict := s.orc.hasConflict(src) s.orc.RUnlock() if conflict { + span.Annotate(nil, "Oracle found conflict") src.Aborted = true return s.proposeTxn(ctx, src) } @@ -332,6 +337,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { for pred := range preds { tablet := s.ServingTablet(pred) if tablet == nil || tablet.GetReadOnly() { + span.Annotate(nil, "Tablet is readonly. Aborting.") src.Aborted = true return s.proposeTxn(ctx, src) } @@ -345,18 +351,26 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { src.CommitTs = assigned.StartId // Mark the transaction as done, irrespective of whether the proposal succeeded or not. defer s.orc.doneUntil.Done(src.CommitTs) + span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))}, + "Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src) if err := s.orc.commit(src); err != nil { + span.Annotatef(nil, "Found a conflict. Aborting.") src.Aborted = true } // Propose txn should be used to set watermark as done. return s.proposeTxn(ctx, src) } +var startOpt = otrace.WithSampler(otrace.AlwaysSample()) + func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) { if ctx.Err() != nil { return nil, ctx.Err() } + ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort", startOpt) + defer span.End() + if !s.Node.AmLeader() { return nil, x.Errorf("Only leader can decide to commit or abort") } diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index e0071edc60e..b1cea02b017 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -526,6 +526,8 @@ func (n *node) Run() { // TODO: Should we move this to the top? if rd.SoftState != nil { if rd.RaftState == raft.StateLeader && !leader { + // I've become the leader. + glog.Infoln("I've become the leader, updating leases") n.server.updateLeases() } leader = rd.RaftState == raft.StateLeader diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 1da356bfde6..5c909aff01b 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -27,6 +27,9 @@ import ( "syscall" "time" + "go.opencensus.io/exporter/jaeger" + "go.opencensus.io/plugin/ocgrpc" + otrace "go.opencensus.io/trace" "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc" @@ -83,6 +86,9 @@ instances to achieve high-availability. flag.StringP("wal", "w", "zw", "Directory storing WAL.") flag.Duration("rebalance_interval", 8*time.Minute, "Interval for trying a predicate move.") flag.Bool("telemetry", true, "Send anonymous telemetry data to Dgraph devs.") + + flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") + flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") } func setupListener(addr string, port int, kind string) (listener net.Listener, err error) { @@ -98,10 +104,35 @@ type state struct { } func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.DiskStorage) { + if agent := Zero.Conf.GetString("jaeger.agent"); len(agent) > 0 { + // Port details: https://www.jaegertracing.io/docs/getting-started/ + // Default endpoints are: + // agentEndpointURI := "localhost:6831" + // collectorEndpointURI := "http://localhost:14268" + collector := Zero.Conf.GetString("jaeger.collector") + je, err := jaeger.NewExporter(jaeger.Options{ + AgentEndpoint: agent, + Endpoint: collector, + ServiceName: "dgraph.zero", + }) + if err != nil { + log.Fatalf("Failed to create the Jaeger exporter: %v", err) + } + // And now finally register it as a Trace Exporter + otrace.RegisterExporter(je) + } + + handler := &ocgrpc.ServerHandler{ + IsPublicEndpoint: false, + StartOptions: otrace.StartOptions{ + Sampler: otrace.AlwaysSample(), + }, + } s := grpc.NewServer( grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), - grpc.MaxConcurrentStreams(1000)) + grpc.MaxConcurrentStreams(1000), + grpc.StatsHandler(handler)) rc := pb.RaftContext{Id: opts.nodeId, Addr: opts.myAddr, Group: 0} m := conn.NewNode(&rc, store) diff --git a/edgraph/server.go b/edgraph/server.go index 87770c0fa54..7e44c9d4c6f 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -66,7 +66,7 @@ type ServerState struct { // TODO(tzdybal) - remove global var State ServerState -// var startOpt = otrace.WithSampler(otrace.AlwaysSample()) +var startOpt = otrace.WithSampler(otrace.AlwaysSample()) func InitServerState() { Config.validate() @@ -332,7 +332,13 @@ func annotateStartTs(span *otrace.Span, ts uint64) { } func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assigned, err error) { - ctx, span := otrace.StartSpan(ctx, "Server.Mutate") + // parent := otrace.FromContext(ctx) + // var sp otrace.SpanContext + // if parent != nil { + // sp = parent.SpanContext() + // } + ctx, span := otrace.StartSpan(ctx, "Server.Mutate", startOpt) + // ctx, span := otrace.StartSpanWithRemoteParent(ctx, "Server.Mutate", sp) defer span.End() resp = &api.Assigned{} @@ -442,7 +448,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons if glog.V(3) { glog.Infof("Got a query: %+v", req) } - ctx, span := otrace.StartSpan(ctx, "Server.Query") + ctx, span := otrace.StartSpan(ctx, "Server.Query", startOpt) defer span.End() if err := x.HealthCheck(); err != nil { @@ -516,7 +522,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons } func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) { - ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort") + ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort", startOpt) defer span.End() if err := x.HealthCheck(); err != nil { diff --git a/worker/groups.go b/worker/groups.go index 19b2c553c22..0b5463d614f 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -852,7 +852,14 @@ func (g *groupi) processOracleDeltaStream() { }) elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) if glog.V(2) { - glog.Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) + glog.Infof("Batched %d updates. Proposing Deltas:", batch) + for _, txn := range delta.Txns { + if txn.CommitTs == 0 { + glog.Infof("Aborted: %d", txn.StartTs) + } else { + glog.Infof("Committed: %d -> %d", txn.StartTs, txn.CommitTs) + } + } } for { // Block forever trying to propose this. From e45ab19ad094cc6f4f8b7b1e4e27855e73605f11 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 19 Nov 2018 16:44:52 -0800 Subject: [PATCH 04/12] Some more annotates --- dgraph/cmd/debug/run.go | 2 +- worker/groups.go | 2 +- worker/task.go | 7 ++++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 31e9cbcd568..848ce2f6b1e 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -161,7 +161,7 @@ func seekTotal(db *badger.DB, readTs uint64) int { var total int for uid, key := range keys { a := readAmount(txn, uid) - fmt.Printf("uid: %-5d key: %d amount: %d\n", uid, key, a) + fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a) total += a } fmt.Printf("Total @ %d = %d\n", readTs, total) diff --git a/worker/groups.go b/worker/groups.go index 0b5463d614f..5889cb5172d 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -852,7 +852,7 @@ func (g *groupi) processOracleDeltaStream() { }) elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) if glog.V(2) { - glog.Infof("Batched %d updates. Proposing Deltas:", batch) + glog.Infof("Batched %d updates. Max Assigned: %d. Proposing Deltas:", batch, delta.MaxAssigned) for _, txn := range delta.Txns { if txn.CommitTs == 0 { glog.Infof("Aborted: %d", txn.StartTs) diff --git a/worker/task.go b/worker/task.go index cdaf3d343e4..9e2aca9f0a1 100644 --- a/worker/task.go +++ b/worker/task.go @@ -592,13 +592,14 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) { span := otrace.FromContext(ctx) - span.Annotate(nil, "Waiting for startTs") + span.Annotatef(nil, "Waiting for startTs: %d", q.ReadTs) if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { return &emptyResult, err } if span != nil { - span.Annotatef(nil, "Done waiting for maxPending to catch up for Attr %q, readTs: %d\n", - q.Attr, q.ReadTs) + maxAssigned := posting.Oracle().MaxAssigned() + span.Annotatef(nil, "Done waiting for maxAssigned to catch up for Attr %q, readTs: %d max: %d\n", + q.Attr, q.ReadTs, maxAssigned) } // If a group stops serving tablet and it gets partitioned away from group zero, then it From 712bcd42b6a09ed2fd3236490740ae2d68f88cd7 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 19 Nov 2018 19:32:28 -0800 Subject: [PATCH 05/12] One fix for Jepsen test, which ensures that a context deadline cannot just cancel a txn proposal. Need some refactoring of how Zero gets membership updates, now that we need Zero to not forward proposals to the leader. --- dgraph/cmd/zero/oracle.go | 16 ++++++++++++++ dgraph/cmd/zero/raft.go | 3 +++ dgraph/cmd/zero/zero.go | 6 +----- worker/groups.go | 45 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 63 insertions(+), 7 deletions(-) diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index daac632fe38..d007db03cc7 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -203,6 +203,7 @@ func (o *Oracle) sendDeltasToSubscribers() { // Don't goto slurp_loop, because it would break from select immediately. } + glog.V(2).Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta) o.Lock() for id, ch := range o.subscribers { select { @@ -279,6 +280,17 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { CommitTs: src.CommitTs, Aborted: src.Aborted, } + + // NOTE: It is important that we continue retrying proposeTxn until we succeed. This should + // happen, irrespective of what the user context timeout might be. We check for it before + // reaching this stage, but not that we're here, we have to ensure that the commit proposal goes + // through. Otherwise, we should block here forever. + // If we don't do this, we'll see txn violations in Jepsen, because we'll send out a MaxAssigned + // higher than a commit, which would cause newer txns to see older data. + ctx = context.Background() // Use a new context with no timeout. + + // If this node stops being the leader, we want this proposal to not be forwarded to the leader, + // and get aborted. if err := s.Node.proposeAndWait(ctx, &zp); err != nil { return err } @@ -358,6 +370,10 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { span.Annotatef(nil, "Found a conflict. Aborting.") src.Aborted = true } + if err := ctx.Err(); err != nil { + span.Annotatef(nil, "Aborting txn due to context timing out.") + src.Aborted = true + } // Propose txn should be used to set watermark as done. return s.proposeTxn(ctx, src) } diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index b1cea02b017..d0139a350bf 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -119,6 +119,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er for err == errInternalRetry { err = propose(timeout) timeout *= 2 // Exponential backoff + if timeout > time.Minute { + timeout = 32 * time.Second + } } return err } diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index 3ea87c39c79..5f38e3cc990 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -247,11 +247,7 @@ func (s *Server) updateZeroLeader() { defer s.Unlock() leader := s.Node.Raft().Status().Lead for _, m := range s.state.Zeros { - if m.Id == leader { - m.Leader = true - } else { - m.Leader = false - } + m.Leader = m.Id == leader } } diff --git a/worker/groups.go b/worker/groups.go index 5889cb5172d..57c09e630c0 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -492,6 +492,44 @@ func (g *groupi) triggerMembershipSync() { } } +const connBaseDelay = 100 * time.Millisecond + +func (g *groupi) connToZeroLeader() *conn.Pool { + pl := g.Leader(0) + if pl != nil { + return pl + } + glog.V(1).Infof("No healthy Zero leader found. Trying to find a Zero leader...") + + // No leader found. Let's get the latest membership state from Zero. + delay := connBaseDelay + maxHalfDelay := time.Second + for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + time.Sleep(delay) + if delay <= maxHalfDelay { + delay *= 2 + } + pl := g.AnyServer(0) + if pl == nil { + glog.V(1).Infof("No healthy Zero server found. Retrying...") + continue + } + zc := pb.NewZeroClient(pl.Get()) + connState, err := zc.Connect(gr.ctx, &pb.Member{ClusterInfoOnly: true}) + if err != nil || connState == nil { + glog.V(1).Infof("While retrieving Zero leader info. Error: %v. Retrying...", err) + continue + } + for _, mz := range connState.State.GetZeros() { + if mz.Leader { + pl := conn.Get().Connect(mz.GetAddr()) + return pl + } + } + glog.V(1).Infof("Unable to connect to a healthy Zero leader. Retrying...") + } +} + // TODO: This function needs to be refactored into smaller functions. It gets hard to reason about. func (g *groupi) periodicMembershipUpdate() { defer g.closer.Done() // CLOSER:1 @@ -514,14 +552,14 @@ START: default: } - pl := g.AnyServer(0) + pl := g.connToZeroLeader() // We should always have some connection to dgraphzero. if pl == nil { glog.Warningln("Membership update: No Zero server known.") time.Sleep(time.Second) goto START } - glog.Infof("Got address of a Zero server: %s", pl.Addr) + glog.Infof("Got address of a Zero leader: %s", pl.Addr) c := pb.NewZeroClient(pl.Get()) ctx, cancel := context.WithCancel(context.Background()) @@ -593,6 +631,9 @@ OUTER: break OUTER } case <-slowTicker.C: + // TODO: Zero should have two different RPCs. One for receiving updates, and one for + // sending updates. + // dgraphzero just adds to the map so check that no data is present for the tablet // before we remove it to avoid the race condition where a tablet is added recently // and mutation has not been persisted to disk. From 384b437138961dfa32dc3af60290a9341f4e01c9 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 19 Nov 2018 19:34:12 -0800 Subject: [PATCH 06/12] Update Raft lib, so we have access to the feature to disallow Raft proposal forwarding. --- vendor/github.com/coreos/etcd/raft/README.md | 12 +- vendor/github.com/coreos/etcd/raft/node.go | 6 +- .../github.com/coreos/etcd/raft/progress.go | 7 +- vendor/github.com/coreos/etcd/raft/raft.go | 275 ++++++++++++++---- vendor/github.com/coreos/etcd/raft/rawnode.go | 4 +- .../github.com/coreos/etcd/raft/read_only.go | 2 +- vendor/github.com/coreos/etcd/raft/status.go | 22 +- vendor/vendor.json | 10 +- 8 files changed, 254 insertions(+), 84 deletions(-) diff --git a/vendor/github.com/coreos/etcd/raft/README.md b/vendor/github.com/coreos/etcd/raft/README.md index f485b839771..fde22b16519 100644 --- a/vendor/github.com/coreos/etcd/raft/README.md +++ b/vendor/github.com/coreos/etcd/raft/README.md @@ -25,12 +25,12 @@ This raft implementation is a full feature implementation of Raft protocol. Feat - Membership changes - Leadership transfer extension - Efficient linearizable read-only queries served by both the leader and followers - - leader checks with quorum and bypasses Raft log before processing read-only queries - - followers asks leader to get a safe read index before processing read-only queries + - leader checks with quorum and bypasses Raft log before processing read-only queries + - followers asks leader to get a safe read index before processing read-only queries - More efficient lease-based linearizable read-only queries served by both the leader and followers - - leader bypasses Raft log and processing read-only queries locally - - followers asks leader to get a safe read index before processing read-only queries - - this approach relies on the clock of the all the machines in raft group + - leader bypasses Raft log and processing read-only queries locally + - followers asks leader to get a safe read index before processing read-only queries + - this approach relies on the clock of the all the machines in raft group This raft implementation also includes a few optional enhancements: @@ -112,7 +112,7 @@ After creating a Node, the user has a few responsibilities: First, read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2. -1. Write HardState, Entries, and Snapshot to persistent storage if they are not empty. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded. +1. Write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic writes then all of them can be written together. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded. 2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). To reduce the I/O latency, an optimization can be applied to make leader write to disk in parallel with its followers (as explained at section 10.2.1 in Raft thesis). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large). Note: Marshalling messages is not thread-safe; it is important to make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialise the messages directly inside the main raft loop. diff --git a/vendor/github.com/coreos/etcd/raft/node.go b/vendor/github.com/coreos/etcd/raft/node.go index 5da1c1193b2..33a9db84001 100644 --- a/vendor/github.com/coreos/etcd/raft/node.go +++ b/vendor/github.com/coreos/etcd/raft/node.go @@ -15,10 +15,10 @@ package raft import ( + "context" "errors" pb "github.com/coreos/etcd/raft/raftpb" - "golang.org/x/net/context" ) type SnapshotStatus int @@ -319,7 +319,7 @@ func (n *node) run(r *raft) { r.Step(m) case m := <-n.recvc: // filter out response message from unknown From. - if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) { + if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) // raft never returns an error } case cc := <-n.confc: @@ -334,6 +334,8 @@ func (n *node) run(r *raft) { switch cc.Type { case pb.ConfChangeAddNode: r.addNode(cc.NodeID) + case pb.ConfChangeAddLearnerNode: + r.addLearner(cc.NodeID) case pb.ConfChangeRemoveNode: // block incoming proposal when local node is // removed diff --git a/vendor/github.com/coreos/etcd/raft/progress.go b/vendor/github.com/coreos/etcd/raft/progress.go index 77c7b52efe3..ef3787db65d 100644 --- a/vendor/github.com/coreos/etcd/raft/progress.go +++ b/vendor/github.com/coreos/etcd/raft/progress.go @@ -48,6 +48,7 @@ type Progress struct { // When in ProgressStateSnapshot, leader should have sent out snapshot // before and stops sending any replication message. State ProgressStateType + // Paused is used in ProgressStateProbe. // When Paused is true, raft should pause sending replication message to this peer. Paused bool @@ -76,6 +77,9 @@ type Progress struct { // be freed by calling inflights.freeTo with the index of the last // received entry. ins *inflights + + // IsLearner is true if this progress is tracked for a learner. + IsLearner bool } func (pr *Progress) resetState(state ProgressStateType) { @@ -243,7 +247,8 @@ func (in *inflights) freeTo(to uint64) { return } - i, idx := 0, in.start + idx := in.start + var i int for i = 0; i < in.count; i++ { if to < in.buffer[idx] { // found the first large inflight break diff --git a/vendor/github.com/coreos/etcd/raft/raft.go b/vendor/github.com/coreos/etcd/raft/raft.go index 29f20398203..b4c0f0248ca 100644 --- a/vendor/github.com/coreos/etcd/raft/raft.go +++ b/vendor/github.com/coreos/etcd/raft/raft.go @@ -116,6 +116,10 @@ type Config struct { // used for testing right now. peers []uint64 + // learners contains the IDs of all leaner nodes (including self if the local node is a leaner) in the raft cluster. + // learners only receives entries from the leader node. It does not vote or promote itself. + learners []uint64 + // ElectionTick is the number of Node.Tick invocations that must pass between // elections. That is, if a follower does not receive any message from the // leader of current term before ElectionTick has elapsed, it will become @@ -171,11 +175,22 @@ type Config struct { // If the clock drift is unbounded, leader might keep the lease longer than it // should (clock can move backward/pause without any bound). ReadIndex is not safe // in that case. + // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. ReadOnlyOption ReadOnlyOption // Logger is the logger used for raft log. For multinode which can host // multiple raft group, each raft group can have its own logger Logger Logger + + // DisableProposalForwarding set to true means that followers will drop + // proposals, rather than forwarding them to the leader. One use case for + // this feature would be in a situation where the Raft leader is used to + // compute the data of a proposal, for example, adding a timestamp from a + // hybrid logical clock to data in a monotonically increasing way. Forwarding + // should be disabled to prevent a follower with an innaccurate hybrid + // logical clock from assigning the timestamp and then forwarding the data + // to the leader. + DisableProposalForwarding bool } func (c *Config) validate() error { @@ -203,6 +218,10 @@ func (c *Config) validate() error { c.Logger = raftLogger } + if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum { + return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased") + } + return nil } @@ -220,9 +239,13 @@ type raft struct { maxInflight int maxMsgSize uint64 prs map[uint64]*Progress + learnerPrs map[uint64]*Progress state StateType + // isLearner is true if the local raft node is a learner. + isLearner bool + votes map[uint64]bool msgs []pb.Message @@ -256,6 +279,7 @@ type raft struct { // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. randomizedElectionTimeout int + disableProposalForwarding bool tick func() step stepFunc @@ -273,32 +297,47 @@ func newRaft(c *Config) *raft { panic(err) // TODO(bdarnell) } peers := c.peers - if len(cs.Nodes) > 0 { - if len(peers) > 0 { + learners := c.learners + if len(cs.Nodes) > 0 || len(cs.Learners) > 0 { + if len(peers) > 0 || len(learners) > 0 { // TODO(bdarnell): the peers argument is always nil except in // tests; the argument should be removed and these tests should be // updated to specify their nodes through a snapshot. - panic("cannot specify both newRaft(peers) and ConfState.Nodes)") + panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)") } peers = cs.Nodes + learners = cs.Learners } r := &raft{ - id: c.ID, - lead: None, - raftLog: raftlog, - maxMsgSize: c.MaxSizePerMsg, - maxInflight: c.MaxInflightMsgs, - prs: make(map[uint64]*Progress), - electionTimeout: c.ElectionTick, - heartbeatTimeout: c.HeartbeatTick, - logger: c.Logger, - checkQuorum: c.CheckQuorum, - preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), + id: c.ID, + lead: None, + isLearner: false, + raftLog: raftlog, + maxMsgSize: c.MaxSizePerMsg, + maxInflight: c.MaxInflightMsgs, + prs: make(map[uint64]*Progress), + learnerPrs: make(map[uint64]*Progress), + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, + checkQuorum: c.CheckQuorum, + preVote: c.PreVote, + readOnly: newReadOnly(c.ReadOnlyOption), + disableProposalForwarding: c.DisableProposalForwarding, } for _, p := range peers { r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} } + for _, p := range learners { + if _, ok := r.prs[p]; ok { + panic(fmt.Sprintf("node %x is in both learner and peer list", p)) + } + r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} + if r.id == p { + r.isLearner = true + } + } + if !isHardStateEqual(hs, emptyState) { r.loadState(hs) } @@ -332,10 +371,13 @@ func (r *raft) hardState() pb.HardState { func (r *raft) quorum() int { return len(r.prs)/2 + 1 } func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs)) + nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs)) for id := range r.prs { nodes = append(nodes, id) } + for id := range r.learnerPrs { + nodes = append(nodes, id) + } sort.Sort(uint64Slice(nodes)) return nodes } @@ -343,10 +385,20 @@ func (r *raft) nodes() []uint64 { // send persists state to stable storage and then sends to its mailbox. func (r *raft) send(m pb.Message) { m.From = r.id - if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { + if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp { if m.Term == 0 { - // PreVote RPCs are sent at a term other than our actual term, so the code - // that sends these messages is responsible for setting the term. + // All {pre-,}campaign messages need to have the term set when + // sending. + // - MsgVote: m.Term is the term the node is campaigning for, + // non-zero as we increment the term when campaigning. + // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was + // granted, non-zero for the same reason MsgVote is + // - MsgPreVote: m.Term is the term the node will campaign, + // non-zero as we use m.Term to indicate the next term we'll be + // campaigning for + // - MsgPreVoteResp: m.Term is the term received in the original + // MsgPreVote if the pre-vote was granted, non-zero for the + // same reasons MsgPreVote is panic(fmt.Sprintf("term should be set when sending %s", m.Type)) } } else { @@ -364,9 +416,17 @@ func (r *raft) send(m pb.Message) { r.msgs = append(r.msgs, m) } +func (r *raft) getProgress(id uint64) *Progress { + if pr, ok := r.prs[id]; ok { + return pr + } + + return r.learnerPrs[id] +} + // sendAppend sends RPC, with entries to the given peer. func (r *raft) sendAppend(to uint64) { - pr := r.prs[to] + pr := r.getProgress(to) if pr.IsPaused() { return } @@ -431,7 +491,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.prs[to].Match, r.raftLog.committed) + commit := min(r.getProgress(to).Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, @@ -442,15 +502,26 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { r.send(m) } +func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) { + for id, pr := range r.prs { + f(id, pr) + } + + for id, pr := range r.learnerPrs { + f(id, pr) + } +} + // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.prs. func (r *raft) bcastAppend() { - for id := range r.prs { + r.forEachProgress(func(id uint64, _ *Progress) { if id == r.id { - continue + return } + r.sendAppend(id) - } + }) } // bcastHeartbeat sends RPC, without entries to all the peers. @@ -464,12 +535,12 @@ func (r *raft) bcastHeartbeat() { } func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { - for id := range r.prs { + r.forEachProgress(func(id uint64, _ *Progress) { if id == r.id { - continue + return } r.sendHeartbeat(id, ctx) - } + }) } // maybeCommit attempts to advance the commit index. Returns true if @@ -478,8 +549,8 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { func (r *raft) maybeCommit() bool { // TODO(bmizerany): optimize.. Currently naive mis := make(uint64Slice, 0, len(r.prs)) - for id := range r.prs { - mis = append(mis, r.prs[id].Match) + for _, p := range r.prs { + mis = append(mis, p.Match) } sort.Sort(sort.Reverse(mis)) mci := mis[r.quorum()-1] @@ -500,12 +571,13 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() r.votes = make(map[uint64]bool) - for id := range r.prs { - r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)} + r.forEachProgress(func(id uint64, pr *Progress) { + *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner} if id == r.id { - r.prs[id].Match = r.raftLog.lastIndex() + pr.Match = r.raftLog.lastIndex() } - } + }) + r.pendingConf = false r.readOnly = newReadOnly(r.readOnly.option) } @@ -517,7 +589,7 @@ func (r *raft) appendEntry(es ...pb.Entry) { es[i].Index = li + 1 + uint64(i) } r.raftLog.append(es...) - r.prs[r.id].maybeUpdate(r.raftLog.lastIndex()) + r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex()) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() } @@ -589,6 +661,7 @@ func (r *raft) becomePreCandidate() { // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. r.step = stepCandidate + r.votes = make(map[uint64]bool) r.tick = r.tickElection r.state = StatePreCandidate r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) @@ -682,7 +755,6 @@ func (r *raft) Step(m pb.Message) error { case m.Term == 0: // local message case m.Term > r.Term: - lead := m.From if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { force := bytes.Equal(m.Context, []byte(campaignTransfer)) inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout @@ -693,7 +765,6 @@ func (r *raft) Step(m pb.Message) error { r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) return nil } - lead = None } switch { case m.Type == pb.MsgPreVote: @@ -707,7 +778,11 @@ func (r *raft) Step(m pb.Message) error { default: r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]", r.id, r.Term, m.Type, m.From, m.Term) - r.becomeFollower(m.Term, lead) + if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { + r.becomeFollower(m.Term, m.From) + } else { + r.becomeFollower(m.Term, None) + } } case m.Term < r.Term: @@ -757,12 +832,27 @@ func (r *raft) Step(m pb.Message) error { } case pb.MsgVote, pb.MsgPreVote: + if r.isLearner { + // TODO: learner may need to vote, in case of node down when confchange. + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + return nil + } // The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should // always equal r.Term. if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) - r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)}) + // When responding to Msg{Pre,}Vote messages we include the term + // from the message, not the local term. To see why consider the + // case where a single node was previously partitioned away and + // it's local term is now of date. If we include the local term + // (recall that for pre-votes we don't update the local term), the + // (pre-)campaigning node on the other end will proceed to ignore + // the message (it ignores all out of date messages). + // The term in the original message and current local term are the + // same in the case of regular votes, but different for pre-votes. + r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) if m.Type == pb.MsgVote { // Only record real votes. r.electionElapsed = 0 @@ -771,7 +861,7 @@ func (r *raft) Step(m pb.Message) error { } else { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) - r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true}) + r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) } default: @@ -836,10 +926,7 @@ func stepLeader(r *raft, m pb.Message) { r.readOnly.addRequest(r.raftLog.committed, m) r.bcastHeartbeatWithCtx(m.Entries[0].Data) case ReadOnlyLeaseBased: - var ri uint64 - if r.checkQuorum { - ri = r.raftLog.committed - } + ri := r.raftLog.committed if m.From == None || m.From == r.id { // from local member r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } else { @@ -854,8 +941,8 @@ func stepLeader(r *raft, m pb.Message) { } // All other message types require a progress for m.From (pr). - pr, prOk := r.prs[m.From] - if !prOk { + pr := r.getProgress(m.From) + if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return } @@ -954,6 +1041,10 @@ func stepLeader(r *raft, m pb.Message) { } r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) case pb.MsgTransferLeader: + if pr.IsLearner { + r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) + return + } leadTransferee := m.From lastLeadTransferee := r.leadTransferee if lastLeadTransferee != None { @@ -1033,6 +1124,9 @@ func stepFollower(r *raft, m pb.Message) { if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return + } else if r.disableProposalForwarding { + r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) + return } m.To = r.lead r.send(m) @@ -1127,20 +1221,37 @@ func (r *raft) restore(s pb.Snapshot) bool { return false } + // The normal peer can't become learner. + if !r.isLearner { + for _, id := range s.Metadata.ConfState.Learners { + if id == r.id { + r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term) + return false + } + } + } + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) r.prs = make(map[uint64]*Progress) - for _, n := range s.Metadata.ConfState.Nodes { + r.learnerPrs = make(map[uint64]*Progress) + r.restoreNode(s.Metadata.ConfState.Nodes, false) + r.restoreNode(s.Metadata.ConfState.Learners, true) + return true +} + +func (r *raft) restoreNode(nodes []uint64, isLearner bool) { + for _, n := range nodes { match, next := uint64(0), r.raftLog.lastIndex()+1 if n == r.id { match = next - 1 + r.isLearner = isLearner } - r.setProgress(n, match, next) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n]) + r.setProgress(n, match, next, isLearner) + r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n)) } - return true } // promotable indicates whether state machine can be promoted to leader, @@ -1151,18 +1262,46 @@ func (r *raft) promotable() bool { } func (r *raft) addNode(id uint64) { + r.addNodeOrLearnerNode(id, false) +} + +func (r *raft) addLearner(id uint64) { + r.addNodeOrLearnerNode(id, true) +} + +func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { r.pendingConf = false - if _, ok := r.prs[id]; ok { - // Ignore any redundant addNode calls (which can happen because the - // initial bootstrapping entries are applied twice). - return + pr := r.getProgress(id) + if pr == nil { + r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + } else { + if isLearner && !pr.IsLearner { + // can only change Learner to Voter + r.logger.Infof("%x ignored addLeaner: do not support changing %x from raft peer to learner.", r.id, id) + return + } + + if isLearner == pr.IsLearner { + // Ignore any redundant addNode calls (which can happen because the + // initial bootstrapping entries are applied twice). + return + } + + // change Learner to Voter, use origin Learner progress + delete(r.learnerPrs, id) + pr.IsLearner = false + r.prs[id] = pr + } + + if r.id == id { + r.isLearner = isLearner } - r.setProgress(id, 0, r.raftLog.lastIndex()+1) // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked // before the added node has a chance to communicate with us. - r.prs[id].RecentActive = true + pr = r.getProgress(id) + pr.RecentActive = true } func (r *raft) removeNode(id uint64) { @@ -1170,7 +1309,7 @@ func (r *raft) removeNode(id uint64) { r.pendingConf = false // do not try to commit or abort transferring if there is no nodes in the cluster. - if len(r.prs) == 0 { + if len(r.prs) == 0 && len(r.learnerPrs) == 0 { return } @@ -1187,12 +1326,22 @@ func (r *raft) removeNode(id uint64) { func (r *raft) resetPendingConf() { r.pendingConf = false } -func (r *raft) setProgress(id, match, next uint64) { - r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} +func (r *raft) setProgress(id, match, next uint64, isLearner bool) { + if !isLearner { + delete(r.learnerPrs, id) + r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} + return + } + + if _, ok := r.prs[id]; ok { + panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id)) + } + r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true} } func (r *raft) delProgress(id uint64) { delete(r.prs, id) + delete(r.learnerPrs, id) } func (r *raft) loadState(state pb.HardState) { @@ -1222,18 +1371,18 @@ func (r *raft) resetRandomizedElectionTimeout() { func (r *raft) checkQuorumActive() bool { var act int - for id := range r.prs { + r.forEachProgress(func(id uint64, pr *Progress) { if id == r.id { // self is always active act++ - continue + return } - if r.prs[id].RecentActive { + if pr.RecentActive && !pr.IsLearner { act++ } - r.prs[id].RecentActive = false - } + pr.RecentActive = false + }) return act >= r.quorum() } diff --git a/vendor/github.com/coreos/etcd/raft/rawnode.go b/vendor/github.com/coreos/etcd/raft/rawnode.go index b950d5169a5..925cb851c4a 100644 --- a/vendor/github.com/coreos/etcd/raft/rawnode.go +++ b/vendor/github.com/coreos/etcd/raft/rawnode.go @@ -175,6 +175,8 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { switch cc.Type { case pb.ConfChangeAddNode: rn.raft.addNode(cc.NodeID) + case pb.ConfChangeAddLearnerNode: + rn.raft.addLearner(cc.NodeID) case pb.ConfChangeRemoveNode: rn.raft.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: @@ -191,7 +193,7 @@ func (rn *RawNode) Step(m pb.Message) error { if IsLocalMsg(m.Type) { return ErrStepLocalMsg } - if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m.Type) { + if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { return rn.raft.Step(m) } return ErrStepPeerNotFound diff --git a/vendor/github.com/coreos/etcd/raft/read_only.go b/vendor/github.com/coreos/etcd/raft/read_only.go index d0085237e36..ae746fa73eb 100644 --- a/vendor/github.com/coreos/etcd/raft/read_only.go +++ b/vendor/github.com/coreos/etcd/raft/read_only.go @@ -18,7 +18,7 @@ import pb "github.com/coreos/etcd/raft/raftpb" // ReadState provides state for read only query. // It's caller's responsibility to call ReadIndex first before getting -// this state from ready, It's also caller's duty to differentiate if this +// this state from ready, it's also caller's duty to differentiate if this // state is what it requests through RequestCtx, eg. given a unique id as // RequestCtx type ReadState struct { diff --git a/vendor/github.com/coreos/etcd/raft/status.go b/vendor/github.com/coreos/etcd/raft/status.go index b690fa56b95..f4d3d86a4e3 100644 --- a/vendor/github.com/coreos/etcd/raft/status.go +++ b/vendor/github.com/coreos/etcd/raft/status.go @@ -28,11 +28,17 @@ type Status struct { Applied uint64 Progress map[uint64]Progress + + LeadTransferee uint64 } // getStatus gets a copy of the current raft status. func getStatus(r *raft) Status { - s := Status{ID: r.id} + s := Status{ + ID: r.id, + LeadTransferee: r.leadTransferee, + } + s.HardState = r.hardState() s.SoftState = *r.softState() @@ -43,6 +49,10 @@ func getStatus(r *raft) Status { for id, p := range r.prs { s.Progress[id] = *p } + + for id, p := range r.learnerPrs { + s.Progress[id] = *p + } } return s @@ -51,19 +61,21 @@ func getStatus(r *raft) Status { // MarshalJSON translates the raft status into JSON. // TODO: try to simplify this by introducing ID type into raft func (s Status) MarshalJSON() ([]byte, error) { - j := fmt.Sprintf(`{"id":"%x","term":%d,"vote":"%x","commit":%d,"lead":"%x","raftState":%q,"progress":{`, - s.ID, s.Term, s.Vote, s.Commit, s.Lead, s.RaftState) + j := fmt.Sprintf(`{"id":"%x","term":%d,"vote":"%x","commit":%d,"lead":"%x","raftState":%q,"applied":%d,"progress":{`, + s.ID, s.Term, s.Vote, s.Commit, s.Lead, s.RaftState, s.Applied) if len(s.Progress) == 0 { - j += "}}" + j += "}," } else { for k, v := range s.Progress { subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"state":%q},`, k, v.Match, v.Next, v.State) j += subj } // remove the trailing "," - j = j[:len(j)-1] + "}}" + j = j[:len(j)-1] + "}," } + + j += fmt.Sprintf(`"leadtransferee":"%x"}`, s.LeadTransferee) return []byte(j), nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index c6d79c1bdd2..1bd337bb44f 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -141,12 +141,12 @@ "revisionTime": "2016-10-10T02:54:55Z" }, { - "checksumSHA1": "h1nLibY0IliypSG0cwbXpSpcsMA=", + "checksumSHA1": "4VUg2Be1lkd0wm8iVTkoMTa58Ow=", "path": "github.com/coreos/etcd/raft", - "revision": "c9504f61fc7f29b0ad30bf8bab02d9e1b600e962", - "revisionTime": "2018-06-15T16:40:41Z", - "version": "v3.2.23", - "versionExact": "v3.2.23" + "revision": "27fc7e2296f506182f58ce846e48f36b34fe6842", + "revisionTime": "2018-10-10T17:17:54Z", + "version": "v3.3.10", + "versionExact": "v3.3.10" }, { "checksumSHA1": "9UUP0nQdKxvJZOFg7e8FP4gzzgA=", From c6502144413a1b13d05382f561d96c710bdfe6a6 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 19 Nov 2018 19:35:47 -0800 Subject: [PATCH 07/12] Update raftpb proto package as well --- .../coreos/etcd/raft/raftpb/raft.pb.go | 216 +++++++++++------- .../coreos/etcd/raft/raftpb/raft.proto | 10 +- vendor/vendor.json | 16 +- 3 files changed, 153 insertions(+), 89 deletions(-) diff --git a/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go b/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go index 4c6e79d58a0..fd9ee3729ec 100644 --- a/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go +++ b/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-gogo. DO NOT EDIT. // source: raft.proto -// DO NOT EDIT! /* Package raftpb is a generated protocol buffer package. @@ -26,6 +25,8 @@ import ( math "math" + _ "github.com/gogo/protobuf/gogoproto" + io "io" ) @@ -162,20 +163,23 @@ func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, type ConfChangeType int32 const ( - ConfChangeAddNode ConfChangeType = 0 - ConfChangeRemoveNode ConfChangeType = 1 - ConfChangeUpdateNode ConfChangeType = 2 + ConfChangeAddNode ConfChangeType = 0 + ConfChangeRemoveNode ConfChangeType = 1 + ConfChangeUpdateNode ConfChangeType = 2 + ConfChangeAddLearnerNode ConfChangeType = 3 ) var ConfChangeType_name = map[int32]string{ 0: "ConfChangeAddNode", 1: "ConfChangeRemoveNode", 2: "ConfChangeUpdateNode", + 3: "ConfChangeAddLearnerNode", } var ConfChangeType_value = map[string]int32{ - "ConfChangeAddNode": 0, - "ConfChangeRemoveNode": 1, - "ConfChangeUpdateNode": 2, + "ConfChangeAddNode": 0, + "ConfChangeRemoveNode": 1, + "ConfChangeUpdateNode": 2, + "ConfChangeAddLearnerNode": 3, } func (x ConfChangeType) Enum() *ConfChangeType { @@ -267,6 +271,7 @@ func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []in type ConfState struct { Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"` + Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -537,6 +542,13 @@ func (m *ConfState) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintRaft(dAtA, i, uint64(num)) } } + if len(m.Learners) > 0 { + for _, num := range m.Learners { + dAtA[i] = 0x10 + i++ + i = encodeVarintRaft(dAtA, i, uint64(num)) + } + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -579,24 +591,6 @@ func (m *ConfChange) MarshalTo(dAtA []byte) (int, error) { return i, nil } -func encodeFixed64Raft(dAtA []byte, offset int, v uint64) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - dAtA[offset+4] = uint8(v >> 32) - dAtA[offset+5] = uint8(v >> 40) - dAtA[offset+6] = uint8(v >> 48) - dAtA[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32Raft(dAtA []byte, offset int, v uint32) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - return offset + 4 -} func encodeVarintRaft(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -700,6 +694,11 @@ func (m *ConfState) Size() (n int) { n += 1 + sovRaft(uint64(e)) } } + if len(m.Learners) > 0 { + for _, e := range m.Learners { + n += 1 + sovRaft(uint64(e)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1619,6 +1618,68 @@ func (m *ConfState) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType) } + case 2: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Learners = append(m.Learners, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Learners = append(m.Learners, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType) + } default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -1888,55 +1949,56 @@ var ( func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) } var fileDescriptorRaft = []byte{ - // 790 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0xdb, 0x46, - 0x10, 0x16, 0x29, 0xea, 0x6f, 0x28, 0xcb, 0xab, 0xb5, 0x5a, 0x2c, 0x0c, 0x43, 0x55, 0x85, 0x1e, - 0x04, 0x17, 0x76, 0x5b, 0x1d, 0x7a, 0xe8, 0xcd, 0x96, 0x0a, 0x58, 0x40, 0x65, 0xb8, 0xb2, 0xdc, - 0x43, 0x83, 0x20, 0x58, 0x8b, 0x2b, 0x4a, 0x89, 0xc9, 0x25, 0x96, 0x2b, 0xc7, 0xbe, 0x04, 0x79, - 0x80, 0x3c, 0x40, 0x2e, 0x79, 0x1f, 0x1f, 0x0d, 0xe4, 0x1e, 0xc4, 0xce, 0x8b, 0x04, 0xbb, 0x5c, - 0x4a, 0x94, 0x74, 0xdb, 0xf9, 0xbe, 0xe1, 0xcc, 0x37, 0xdf, 0xce, 0x12, 0x40, 0xd0, 0xa9, 0x3c, - 0x8e, 0x04, 0x97, 0x1c, 0x17, 0xd5, 0x39, 0xba, 0xde, 0x6f, 0xf8, 0xdc, 0xe7, 0x1a, 0xfa, 0x4d, - 0x9d, 0x12, 0xb6, 0xfd, 0x0e, 0x0a, 0x7f, 0x87, 0x52, 0xdc, 0xe3, 0x5f, 0xc1, 0x19, 0xdf, 0x47, - 0x8c, 0x58, 0x2d, 0xab, 0x53, 0xeb, 0xd6, 0x8f, 0x93, 0xaf, 0x8e, 0x35, 0xa9, 0x88, 0x53, 0xe7, - 0xe1, 0xcb, 0x4f, 0xb9, 0x91, 0x4e, 0xc2, 0x04, 0x9c, 0x31, 0x13, 0x01, 0xb1, 0x5b, 0x56, 0xc7, - 0x59, 0x32, 0x4c, 0x04, 0x78, 0x1f, 0x0a, 0x83, 0xd0, 0x63, 0x77, 0x24, 0x9f, 0xa1, 0x12, 0x08, - 0x63, 0x70, 0xfa, 0x54, 0x52, 0xe2, 0xb4, 0xac, 0x4e, 0x75, 0xa4, 0xcf, 0xed, 0xf7, 0x16, 0xa0, - 0xcb, 0x90, 0x46, 0xf1, 0x8c, 0xcb, 0x21, 0x93, 0xd4, 0xa3, 0x92, 0xe2, 0x3f, 0x01, 0x26, 0x3c, - 0x9c, 0xbe, 0x8a, 0x25, 0x95, 0x89, 0x22, 0x77, 0xa5, 0xa8, 0xc7, 0xc3, 0xe9, 0xa5, 0x22, 0x4c, - 0xf1, 0xca, 0x24, 0x05, 0x54, 0xf3, 0xb9, 0x6e, 0x9e, 0xd5, 0x95, 0x40, 0x4a, 0xb2, 0x54, 0x92, - 0xb3, 0xba, 0x34, 0xd2, 0xfe, 0x1f, 0xca, 0xa9, 0x02, 0x25, 0x51, 0x29, 0xd0, 0x3d, 0xab, 0x23, - 0x7d, 0xc6, 0x7f, 0x41, 0x39, 0x30, 0xca, 0x74, 0x61, 0xb7, 0x4b, 0x52, 0x2d, 0x9b, 0xca, 0x4d, - 0xdd, 0x65, 0x7e, 0xfb, 0x53, 0x1e, 0x4a, 0x43, 0x16, 0xc7, 0xd4, 0x67, 0xf8, 0x08, 0x1c, 0xb9, - 0x72, 0x78, 0x2f, 0xad, 0x61, 0xe8, 0xac, 0xc7, 0x2a, 0x0d, 0x37, 0xc0, 0x96, 0x7c, 0x6d, 0x12, - 0x5b, 0x72, 0x35, 0xc6, 0x54, 0xf0, 0x8d, 0x31, 0x14, 0xb2, 0x1c, 0xd0, 0xd9, 0x1c, 0x10, 0x37, - 0xa1, 0x74, 0xc3, 0x7d, 0x7d, 0x61, 0x85, 0x0c, 0x99, 0x82, 0x2b, 0xdb, 0x8a, 0xdb, 0xb6, 0x1d, - 0x41, 0x89, 0x85, 0x52, 0xcc, 0x59, 0x4c, 0x4a, 0xad, 0x7c, 0xc7, 0xed, 0xee, 0xac, 0x6d, 0x46, - 0x5a, 0xca, 0xe4, 0xe0, 0x03, 0x28, 0x4e, 0x78, 0x10, 0xcc, 0x25, 0x29, 0x67, 0x6a, 0x19, 0x0c, - 0x77, 0xa1, 0x1c, 0x1b, 0xc7, 0x48, 0x45, 0x3b, 0x89, 0x36, 0x9d, 0x4c, 0x1d, 0x4c, 0xf3, 0x54, - 0x45, 0xc1, 0x5e, 0xb3, 0x89, 0x24, 0xd0, 0xb2, 0x3a, 0xe5, 0xb4, 0x62, 0x82, 0xe1, 0x5f, 0x00, - 0x92, 0xd3, 0xd9, 0x3c, 0x94, 0xc4, 0xcd, 0xf4, 0xcc, 0xe0, 0x98, 0x40, 0x69, 0xc2, 0x43, 0xc9, - 0xee, 0x24, 0xa9, 0xea, 0x8b, 0x4d, 0xc3, 0xf6, 0x4b, 0xa8, 0x9c, 0x51, 0xe1, 0x25, 0xeb, 0x93, - 0x3a, 0x68, 0x6d, 0x39, 0x48, 0xc0, 0xb9, 0xe5, 0x92, 0xad, 0xef, 0xbb, 0x42, 0x32, 0x03, 0xe7, - 0xb7, 0x07, 0x6e, 0xff, 0x0c, 0x95, 0xe5, 0xba, 0xe2, 0x06, 0x14, 0x42, 0xee, 0xb1, 0x98, 0x58, - 0xad, 0x7c, 0xc7, 0x19, 0x25, 0x41, 0xfb, 0x83, 0x05, 0xa0, 0x72, 0x7a, 0x33, 0x1a, 0xfa, 0xfa, - 0xd6, 0x07, 0xfd, 0x35, 0x05, 0xf6, 0xa0, 0x8f, 0x7f, 0x37, 0x8f, 0xd3, 0xd6, 0xab, 0xf3, 0x63, - 0xf6, 0x29, 0x24, 0xdf, 0x6d, 0xbd, 0xd0, 0x03, 0x28, 0x9e, 0x73, 0x8f, 0x0d, 0xfa, 0xeb, 0xba, - 0x12, 0x4c, 0x19, 0xd2, 0x33, 0x86, 0x24, 0x8f, 0x31, 0x0d, 0x0f, 0xff, 0x80, 0xca, 0xf2, 0xc9, - 0xe3, 0x5d, 0x70, 0x75, 0x70, 0xce, 0x45, 0x40, 0x6f, 0x50, 0x0e, 0xef, 0xc1, 0xae, 0x06, 0x56, - 0x8d, 0x91, 0x75, 0xf8, 0xd9, 0x06, 0x37, 0xb3, 0xc4, 0x18, 0xa0, 0x38, 0x8c, 0xfd, 0xb3, 0x45, - 0x84, 0x72, 0xd8, 0x85, 0xd2, 0x30, 0xf6, 0x4f, 0x19, 0x95, 0xc8, 0x32, 0xc1, 0x85, 0xe0, 0x11, - 0xb2, 0x4d, 0xd6, 0x49, 0x14, 0xa1, 0x3c, 0xae, 0x01, 0x24, 0xe7, 0x11, 0x8b, 0x23, 0xe4, 0x98, - 0xc4, 0xff, 0xb8, 0x64, 0xa8, 0xa0, 0x44, 0x98, 0x40, 0xb3, 0x45, 0xc3, 0xaa, 0x85, 0x41, 0x25, - 0x8c, 0xa0, 0xaa, 0x9a, 0x31, 0x2a, 0xe4, 0xb5, 0xea, 0x52, 0xc6, 0x0d, 0x40, 0x59, 0x44, 0x7f, - 0x54, 0xc1, 0x18, 0x6a, 0xc3, 0xd8, 0xbf, 0x0a, 0x05, 0xa3, 0x93, 0x19, 0xbd, 0xbe, 0x61, 0x08, - 0x70, 0x1d, 0x76, 0x4c, 0x21, 0x75, 0x41, 0x8b, 0x18, 0xb9, 0x26, 0xad, 0x37, 0x63, 0x93, 0x37, - 0xff, 0x2e, 0xb8, 0x58, 0x04, 0xa8, 0x8a, 0x7f, 0x80, 0xfa, 0x30, 0xf6, 0xc7, 0x82, 0x86, 0xf1, - 0x94, 0x89, 0x7f, 0x18, 0xf5, 0x98, 0x40, 0x3b, 0xe6, 0xeb, 0xf1, 0x3c, 0x60, 0x7c, 0x21, 0xcf, - 0xf9, 0x5b, 0x54, 0x33, 0x62, 0x46, 0x8c, 0x7a, 0xfa, 0x87, 0x87, 0x76, 0x8d, 0x98, 0x25, 0xa2, - 0xc5, 0x20, 0x33, 0xef, 0x85, 0x60, 0x7a, 0xc4, 0xba, 0xe9, 0x6a, 0x62, 0x9d, 0x83, 0x0f, 0x5f, - 0x40, 0x6d, 0xfd, 0x7a, 0x95, 0x8e, 0x15, 0x72, 0xe2, 0x79, 0xea, 0x2e, 0x51, 0x0e, 0x13, 0x68, - 0xac, 0xe0, 0x11, 0x0b, 0xf8, 0x2d, 0xd3, 0x8c, 0xb5, 0xce, 0x5c, 0x45, 0x1e, 0x95, 0x09, 0x63, - 0x9f, 0x92, 0x87, 0xa7, 0x66, 0xee, 0xf1, 0xa9, 0x99, 0x7b, 0x78, 0x6e, 0x5a, 0x8f, 0xcf, 0x4d, - 0xeb, 0xeb, 0x73, 0xd3, 0xfa, 0xf8, 0xad, 0x99, 0xfb, 0x1e, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x30, - 0x01, 0x41, 0x3a, 0x06, 0x00, 0x00, + // 815 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45, + 0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38, + 0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b, + 0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20, + 0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3, + 0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9, + 0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f, + 0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77, + 0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24, + 0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37, + 0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01, + 0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03, + 0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42, + 0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21, + 0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36, + 0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb, + 0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95, + 0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02, + 0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36, + 0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20, + 0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d, + 0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d, + 0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c, + 0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3, + 0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53, + 0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa, + 0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa, + 0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0, + 0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73, + 0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb, + 0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b, + 0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67, + 0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60, + 0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70, + 0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63, + 0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1, + 0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe, + 0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc, + 0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83, + 0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21, + 0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1, + 0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6, + 0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4, + 0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65, + 0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9, + 0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa, + 0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73, + 0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0, + 0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c, + 0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8, + 0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00, } diff --git a/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto b/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto index 806a43634fd..644ce7b8f2f 100644 --- a/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto +++ b/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto @@ -76,13 +76,15 @@ message HardState { } message ConfState { - repeated uint64 nodes = 1; + repeated uint64 nodes = 1; + repeated uint64 learners = 2; } enum ConfChangeType { - ConfChangeAddNode = 0; - ConfChangeRemoveNode = 1; - ConfChangeUpdateNode = 2; + ConfChangeAddNode = 0; + ConfChangeRemoveNode = 1; + ConfChangeUpdateNode = 2; + ConfChangeAddLearnerNode = 3; } message ConfChange { diff --git a/vendor/vendor.json b/vendor/vendor.json index 1bd337bb44f..92fa0041bbe 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -149,12 +149,12 @@ "versionExact": "v3.3.10" }, { - "checksumSHA1": "9UUP0nQdKxvJZOFg7e8FP4gzzgA=", + "checksumSHA1": "cwEnAGl7uzwDepjDZcIocMVEVEE=", "path": "github.com/coreos/etcd/raft/raftpb", - "revision": "c9504f61fc7f29b0ad30bf8bab02d9e1b600e962", - "revisionTime": "2018-06-15T16:40:41Z", - "version": "v3.2.23", - "versionExact": "v3.2.23" + "revision": "27fc7e2296f506182f58ce846e48f36b34fe6842", + "revisionTime": "2018-10-10T17:17:54Z", + "version": "v3.3.10", + "versionExact": "v3.3.10" }, { "checksumSHA1": "Lf3uUXTkKK5DJ37BxQvxO1Fq+K8=", @@ -307,10 +307,10 @@ "revisionTime": "2016-01-25T20:49:56Z" }, { - "checksumSHA1": "GaJLoEuMGnP5ofXvuweAI4wx06U=", + "checksumSHA1": "MVW67aGwWsD1pGmL04HlQlaOfKE=", "path": "github.com/golang/protobuf/proto", - "revision": "d3de96c4c28ef8af3aa1a892fc481e0f103c01ff", - "revisionTime": "2018-10-25T22:50:59Z" + "revision": "882cf97a83ad205fd22af574246a3bc647d7a7d2", + "revisionTime": "2018-11-20T00:18:57Z" }, { "checksumSHA1": "z4copNgeTN77OymdDKqLaIK/vSI=", From 9046554fb0477951df7aa6c2691210536a54524b Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 19 Nov 2018 20:24:19 -0800 Subject: [PATCH 08/12] Dirty changes to ensure that Zero followers don't forward proposals to Zero leader. --- dgraph/cmd/zero/raft.go | 3 +++ dgraph/cmd/zero/run.go | 4 +++ worker/groups.go | 56 +++++++++++++++++++++++++---------------- 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index d0139a350bf..6bcb6de2623 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -70,6 +70,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er } propose := func(timeout time.Duration) error { + if !n.AmLeader() { + return x.Errorf("Not Zero leader. Aborting proposal: %+v", proposal) + } cctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 5c909aff01b..bed2aff9584 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -136,6 +136,10 @@ func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.Di rc := pb.RaftContext{Id: opts.nodeId, Addr: opts.myAddr, Group: 0} m := conn.NewNode(&rc, store) + + // Zero followers should not be forwarding proposals to the leader, to avoid txn commits which + // were calculated in a previous Zero leader. + m.Cfg.DisableProposalForwarding = true st.rs = &conn.RaftServer{Node: m} st.node = &node{Node: m, ctx: context.Background(), stop: make(chan struct{})} diff --git a/worker/groups.go b/worker/groups.go index 57c09e630c0..287d7f15ea5 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -92,26 +92,37 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { glog.Infof("Current Raft Id: %d\n", Config.RaftId) // Successfully connect with dgraphzero, before doing anything else. - p := conn.Get().Connect(Config.ZeroAddr) - // Connect with dgraphzero and figure out what group we should belong to. - zc := pb.NewZeroClient(p.Get()) - var connState *pb.ConnectionState + // Connect with Zero leader and figure out what group we should belong to. m := &pb.Member{Id: Config.RaftId, Addr: Config.MyAddr} - delay := 50 * time.Millisecond - maxHalfDelay := 3 * time.Second + var connState *pb.ConnectionState var err error - for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + for { + pl := gr.connToZeroLeader(Config.ZeroAddr) + if pl == nil { + continue + } + zc := pb.NewZeroClient(pl.Get()) connState, err = zc.Connect(gr.ctx, m) if err == nil || grpc.ErrorDesc(err) == x.ErrReuseRemovedId.Error() { break } - glog.Errorf("Error while connecting with group zero: %v", err) - time.Sleep(delay) - if delay <= maxHalfDelay { - delay *= 2 - } } + + // delay := 50 * time.Millisecond + // maxHalfDelay := 3 * time.Second + // var err error + // for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + // connState, err = zc.Connect(gr.ctx, m) + // if err == nil || grpc.ErrorDesc(err) == x.ErrReuseRemovedId.Error() { + // break + // } + // glog.Errorf("Error while connecting with group zero: %v", err) + // time.Sleep(delay) + // if delay <= maxHalfDelay { + // delay *= 2 + // } + // } x.CheckfNoTrace(err) if connState.GetMember() == nil || connState.GetState() == nil { x.Fatalf("Unable to join cluster via dgraphzero") @@ -121,8 +132,6 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { // This timestamp would be used for reading during snapshot after bulk load. // The stream is async, we need this information before we start or else replica might // not get any data. - // TODO: Do we really need this? - // posting.Oracle().SetMaxPending(connState.MaxPending) gr.applyState(connState.GetState()) gid := gr.groupId() @@ -352,7 +361,7 @@ func (g *groupi) Tablet(key string) *pb.Tablet { // We don't know about this tablet. // Check with dgraphzero if we can serve it. - pl := g.AnyServer(0) + pl := g.Leader(0) if pl == nil { return nil } @@ -494,7 +503,7 @@ func (g *groupi) triggerMembershipSync() { const connBaseDelay = 100 * time.Millisecond -func (g *groupi) connToZeroLeader() *conn.Pool { +func (g *groupi) connToZeroLeader(addr string) *conn.Pool { pl := g.Leader(0) if pl != nil { return pl @@ -509,10 +518,15 @@ func (g *groupi) connToZeroLeader() *conn.Pool { if delay <= maxHalfDelay { delay *= 2 } - pl := g.AnyServer(0) - if pl == nil { - glog.V(1).Infof("No healthy Zero server found. Retrying...") - continue + var pl *conn.Pool + if len(addr) > 0 { + pl = conn.Get().Connect(addr) + } else { + pl = g.AnyServer(0) + if pl == nil { + glog.V(1).Infof("No healthy Zero server found. Retrying...") + continue + } } zc := pb.NewZeroClient(pl.Get()) connState, err := zc.Connect(gr.ctx, &pb.Member{ClusterInfoOnly: true}) @@ -552,7 +566,7 @@ START: default: } - pl := g.connToZeroLeader() + pl := g.connToZeroLeader("") // We should always have some connection to dgraphzero. if pl == nil { glog.Warningln("Membership update: No Zero server known.") From fcdd3ddfbb6f0a09838dcbbf69bf7c9c985095c5 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 20 Nov 2018 12:03:33 -0800 Subject: [PATCH 09/12] Various Opencensus integration changes. --- conn/pool.go | 2 ++ dgraph/cmd/alpha/run.go | 64 +++++++++++++++++++-------------------- dgraph/cmd/zero/oracle.go | 4 +-- dgraph/cmd/zero/run.go | 33 ++++++++++---------- edgraph/server.go | 15 ++------- 5 files changed, 54 insertions(+), 64 deletions(-) diff --git a/conn/pool.go b/conn/pool.go index 3c876bbedfb..31c41743935 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + "go.opencensus.io/plugin/ocgrpc" "google.golang.org/grpc" ) @@ -124,6 +125,7 @@ func (p *Pools) Connect(addr string) *Pool { // NewPool creates a new "pool" with one gRPC connection, refcount 0. func NewPool(addr string) (*Pool, error) { conn, err := grpc.Dial(addr, + grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(x.GrpcMaxSize), grpc.MaxCallSendMsgSize(x.GrpcMaxSize)), diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 161b58a4d91..4c6c1c420bd 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -44,8 +44,8 @@ import ( "github.com/spf13/cobra" "go.opencensus.io/exporter/jaeger" "go.opencensus.io/plugin/ocgrpc" - "go.opencensus.io/stats/view" otrace "go.opencensus.io/trace" + "go.opencensus.io/zpages" "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc" @@ -92,7 +92,9 @@ they form a Raft group and provide synchronous replication. "[mmap, disk] Specifies how Badger Value log is stored."+ " mmap consumes more RAM, but provides better performance.") - flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") + // OpenCensus flags. + flag.Float64("trace", 1.0, "The ratio of queries to trace.") + // flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.") @@ -103,7 +105,6 @@ they form a Raft group and provide synchronous replication. flag.String("export", "export", "Folder in which to store exports.") flag.Int("pending_proposals", 256, "Number of pending mutation proposals. Useful for rate limiting.") - flag.Float64("trace", 0.0, "The ratio of queries to trace.") flag.String("my", "", "IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.") flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc), @@ -242,36 +243,13 @@ func setupListener(addr string, port int, reload func()) (net.Listener, error) { func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) { defer wg.Done() - if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - glog.Fatalf("Unable to register opencensus: %v", err) - } - handler := &ocgrpc.ServerHandler{ - IsPublicEndpoint: false, - StartOptions: otrace.StartOptions{ - Sampler: otrace.AlwaysSample(), - }, - } - opt := []grpc.ServerOption{ - grpc.MaxRecvMsgSize(x.GrpcMaxSize), - grpc.MaxSendMsgSize(x.GrpcMaxSize), - grpc.MaxConcurrentStreams(1000), - grpc.StatsHandler(handler), - } - if tlsCfg != nil { - opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg))) - } - - if agent := Alpha.Conf.GetString("jaeger.agent"); len(agent) > 0 { + if collector := Alpha.Conf.GetString("jaeger.collector"); len(collector) > 0 { // Port details: https://www.jaegertracing.io/docs/getting-started/ - // Default endpoints are: - // agentEndpointURI := "localhost:6831" - // collectorEndpointURI := "http://localhost:14268" - collector := Alpha.Conf.GetString("jaeger.collector") + // Default collectorEndpointURI := "http://localhost:14268" je, err := jaeger.NewExporter(jaeger.Options{ - AgentEndpoint: agent, - Endpoint: collector, - ServiceName: "dgraph.alpha", + Endpoint: collector, + ServiceName: "dgraph.alpha", }) if err != nil { log.Fatalf("Failed to create the Jaeger exporter: %v", err) @@ -279,6 +257,21 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) { // And now finally register it as a Trace Exporter otrace.RegisterExporter(je) } + // Exclusively for stats, metrics, etc. Not for tracing. + // var views = append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...) + // if err := view.Register(views...); err != nil { + // glog.Fatalf("Unable to register OpenCensus stats: %v", err) + // } + + opt := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(x.GrpcMaxSize), + grpc.MaxSendMsgSize(x.GrpcMaxSize), + grpc.MaxConcurrentStreams(1000), + grpc.StatsHandler(&ocgrpc.ServerHandler{}), + } + if tlsCfg != nil { + opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg))) + } s := grpc.NewServer(opt...) api.RegisterDgraphServer(s, &edgraph.Server{}) @@ -350,12 +343,18 @@ func setupServer() { http.HandleFunc("/alter", alterHandler) http.HandleFunc("/health", healthCheck) http.HandleFunc("/share", shareHandler) + + // TODO: Figure out what this is for? http.HandleFunc("/debug/store", storeStatsHandler) + http.HandleFunc("/admin/shutdown", shutDownHandler) http.HandleFunc("/admin/backup", backupHandler) http.HandleFunc("/admin/export", exportHandler) http.HandleFunc("/admin/config/lru_mb", memoryLimitHandler) + // Add OpenCensus z-pages. + zpages.Handle(http.DefaultServeMux, "/z") + http.HandleFunc("/", homeHandler) http.HandleFunc("/ui/keywords", keywordHandler) @@ -430,9 +429,8 @@ func run() { return true, true } } - - // TODO: Set this to the trace sampling rate set in flags. - otrace.ApplyConfig(otrace.Config{DefaultSampler: otrace.AlwaysSample()}) + otrace.ApplyConfig(otrace.Config{ + DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)}) // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index d007db03cc7..16ec9712e6a 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -378,13 +378,11 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { return s.proposeTxn(ctx, src) } -var startOpt = otrace.WithSampler(otrace.AlwaysSample()) - func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.TxnContext, error) { if ctx.Err() != nil { return nil, ctx.Err() } - ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort", startOpt) + ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort") defer span.End() if !s.Node.AmLeader() { diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index bed2aff9584..ddae83a23fb 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -30,6 +30,7 @@ import ( "go.opencensus.io/exporter/jaeger" "go.opencensus.io/plugin/ocgrpc" otrace "go.opencensus.io/trace" + "go.opencensus.io/zpages" "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc" @@ -87,7 +88,9 @@ instances to achieve high-availability. flag.Duration("rebalance_interval", 8*time.Minute, "Interval for trying a predicate move.") flag.Bool("telemetry", true, "Send anonymous telemetry data to Dgraph devs.") - flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") + // OpenCensus flags. + flag.Float64("trace", 1.0, "The ratio of queries to trace.") + // flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") } @@ -104,16 +107,12 @@ type state struct { } func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.DiskStorage) { - if agent := Zero.Conf.GetString("jaeger.agent"); len(agent) > 0 { + if collector := Zero.Conf.GetString("jaeger.collector"); len(collector) > 0 { // Port details: https://www.jaegertracing.io/docs/getting-started/ - // Default endpoints are: - // agentEndpointURI := "localhost:6831" - // collectorEndpointURI := "http://localhost:14268" - collector := Zero.Conf.GetString("jaeger.collector") + // Default collectorEndpointURI := "http://localhost:14268" je, err := jaeger.NewExporter(jaeger.Options{ - AgentEndpoint: agent, - Endpoint: collector, - ServiceName: "dgraph.zero", + Endpoint: collector, + ServiceName: "dgraph.zero", }) if err != nil { log.Fatalf("Failed to create the Jaeger exporter: %v", err) @@ -121,18 +120,17 @@ func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.Di // And now finally register it as a Trace Exporter otrace.RegisterExporter(je) } + // Exclusively for stats, metrics, etc. Not for tracing. + // var views = append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...) + // if err := view.Register(views...); err != nil { + // glog.Fatalf("Unable to register OpenCensus stats: %v", err) + // } - handler := &ocgrpc.ServerHandler{ - IsPublicEndpoint: false, - StartOptions: otrace.StartOptions{ - Sampler: otrace.AlwaysSample(), - }, - } s := grpc.NewServer( grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), grpc.MaxConcurrentStreams(1000), - grpc.StatsHandler(handler)) + grpc.StatsHandler(&ocgrpc.ServerHandler{})) rc := pb.RaftContext{Id: opts.nodeId, Addr: opts.myAddr, Group: 0} m := conn.NewNode(&rc, store) @@ -198,6 +196,8 @@ func run() { } } grpc.EnableTracing = false + otrace.ApplyConfig(otrace.Config{ + DefaultSampler: otrace.ProbabilitySampler(Zero.Conf.GetFloat64("trace"))}) addr := "localhost" if opts.bindall { @@ -239,6 +239,7 @@ func run() { http.HandleFunc("/removeNode", st.removeNode) http.HandleFunc("/moveTablet", st.moveTablet) http.HandleFunc("/assignIds", st.assignUids) + zpages.Handle(http.DefaultServeMux, "/z") // This must be here. It does not work if placed before Grpc init. x.Check(st.node.initAndStartNode()) diff --git a/edgraph/server.go b/edgraph/server.go index 7e44c9d4c6f..d8f7ecea0ec 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -63,11 +63,8 @@ type ServerState struct { needTs chan tsReq } -// TODO(tzdybal) - remove global var State ServerState -var startOpt = otrace.WithSampler(otrace.AlwaysSample()) - func InitServerState() { Config.validate() @@ -332,13 +329,7 @@ func annotateStartTs(span *otrace.Span, ts uint64) { } func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assigned, err error) { - // parent := otrace.FromContext(ctx) - // var sp otrace.SpanContext - // if parent != nil { - // sp = parent.SpanContext() - // } - ctx, span := otrace.StartSpan(ctx, "Server.Mutate", startOpt) - // ctx, span := otrace.StartSpanWithRemoteParent(ctx, "Server.Mutate", sp) + ctx, span := otrace.StartSpan(ctx, "Server.Mutate") defer span.End() resp = &api.Assigned{} @@ -448,7 +439,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons if glog.V(3) { glog.Infof("Got a query: %+v", req) } - ctx, span := otrace.StartSpan(ctx, "Server.Query", startOpt) + ctx, span := otrace.StartSpan(ctx, "Server.Query") defer span.End() if err := x.HealthCheck(); err != nil { @@ -522,7 +513,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons } func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) { - ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort", startOpt) + ctx, span := otrace.StartSpan(ctx, "Server.CommitOrAbort") defer span.End() if err := x.HealthCheck(); err != nil { From b1cbfe4a846002eed0fd19d66730006128674443 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 20 Nov 2018 12:15:27 -0800 Subject: [PATCH 10/12] Add a TODO --- worker/groups.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/worker/groups.go b/worker/groups.go index 287d7f15ea5..691c165b844 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -545,6 +545,8 @@ func (g *groupi) connToZeroLeader(addr string) *conn.Pool { } // TODO: This function needs to be refactored into smaller functions. It gets hard to reason about. +// TODO: The updates have to be sent to Zero leader. But, the membership update receives can be from +// any Zero server. Let's break that up into two different endpoints. func (g *groupi) periodicMembershipUpdate() { defer g.closer.Done() // CLOSER:1 From 149e6b1aec0cbffad4bfeea4226b663c97ffda28 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 20 Nov 2018 14:08:21 -0800 Subject: [PATCH 11/12] Self review --- dgraph/cmd/alpha/run.go | 1 - dgraph/cmd/zero/oracle.go | 8 ++++---- dgraph/cmd/zero/raft.go | 3 +-- dgraph/cmd/zero/run.go | 1 - edgraph/server.go | 11 +---------- worker/config.go | 1 + worker/groups.go | 29 +++++------------------------ worker/task.go | 2 +- 8 files changed, 13 insertions(+), 43 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 4c6c1c420bd..277f7bc7278 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -94,7 +94,6 @@ they form a Raft group and provide synchronous replication. // OpenCensus flags. flag.Float64("trace", 1.0, "The ratio of queries to trace.") - // flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.") diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 16ec9712e6a..b6095acf065 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -283,10 +283,10 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { // NOTE: It is important that we continue retrying proposeTxn until we succeed. This should // happen, irrespective of what the user context timeout might be. We check for it before - // reaching this stage, but not that we're here, we have to ensure that the commit proposal goes - // through. Otherwise, we should block here forever. - // If we don't do this, we'll see txn violations in Jepsen, because we'll send out a MaxAssigned - // higher than a commit, which would cause newer txns to see older data. + // reaching this stage, but now that we're here, we have to ensure that the commit proposal goes + // through. Otherwise, we should block here forever. If we don't do this, we'll see txn + // violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would + // cause newer txns to see older data. ctx = context.Background() // Use a new context with no timeout. // If this node stops being the leader, we want this proposal to not be forwarded to the leader, diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 6bcb6de2623..0f5d39f29c8 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -532,8 +532,7 @@ func (n *node) Run() { // TODO: Should we move this to the top? if rd.SoftState != nil { if rd.RaftState == raft.StateLeader && !leader { - // I've become the leader. - glog.Infoln("I've become the leader, updating leases") + glog.Infoln("I've become the leader, updating leases.") n.server.updateLeases() } leader = rd.RaftState == raft.StateLeader diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index ddae83a23fb..40fa14ef0fe 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -90,7 +90,6 @@ instances to achieve high-availability. // OpenCensus flags. flag.Float64("trace", 1.0, "The ratio of queries to trace.") - // flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") } diff --git a/edgraph/server.go b/edgraph/server.go index d8f7ecea0ec..a96da8e5afb 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -527,18 +527,9 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx if tc.StartTs == 0 { return &api.TxnContext{}, fmt.Errorf("StartTs cannot be zero while committing a transaction.") } + annotateStartTs(span, tc.StartTs) span.Annotatef(nil, "Txn Context received: %+v", tc) - var attrs []otrace.Attribute - for _, key := range tc.Keys { - attrs = append(attrs, otrace.StringAttribute("conflict", key)) - } - for _, pred := range tc.Preds { - attrs = append(attrs, otrace.StringAttribute("pred", pred)) - } - span.Annotate(attrs, "Conflict Keys and Preds") - - annotateStartTs(span, tc.StartTs) commitTs, err := worker.CommitOverNetwork(ctx, tc) if err == y.ErrAborted { tctx.Aborted = true diff --git a/worker/config.go b/worker/config.go index 773a070d5d1..81e8e375a18 100644 --- a/worker/config.go +++ b/worker/config.go @@ -24,6 +24,7 @@ type IPRange struct { type Options struct { ExportPath string NumPendingProposals int + // TODO: Get rid of this here. Tracing float64 MyAddr string ZeroAddr string diff --git a/worker/groups.go b/worker/groups.go index 691c165b844..6aaf2c3fa41 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc" "golang.org/x/net/context" - "golang.org/x/net/trace" "github.com/dgraph-io/badger" "github.com/dgraph-io/badger/y" @@ -97,7 +96,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { m := &pb.Member{Id: Config.RaftId, Addr: Config.MyAddr} var connState *pb.ConnectionState var err error - for { + for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 pl := gr.connToZeroLeader(Config.ZeroAddr) if pl == nil { continue @@ -108,21 +107,6 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { break } } - - // delay := 50 * time.Millisecond - // maxHalfDelay := 3 * time.Second - // var err error - // for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 - // connState, err = zc.Connect(gr.ctx, m) - // if err == nil || grpc.ErrorDesc(err) == x.ErrReuseRemovedId.Error() { - // break - // } - // glog.Errorf("Error while connecting with group zero: %v", err) - // time.Sleep(delay) - // if delay <= maxHalfDelay { - // delay *= 2 - // } - // } x.CheckfNoTrace(err) if connState.GetMember() == nil || connState.GetState() == nil { x.Fatalf("Unable to join cluster via dgraphzero") @@ -521,7 +505,8 @@ func (g *groupi) connToZeroLeader(addr string) *conn.Pool { var pl *conn.Pool if len(addr) > 0 { pl = conn.Get().Connect(addr) - } else { + } + if pl == nil { pl = g.AnyServer(0) if pl == nil { glog.V(1).Infof("No healthy Zero server found. Retrying...") @@ -799,15 +784,12 @@ func (g *groupi) processOracleDeltaStream() { defer ticker.Stop() blockingReceiveAndPropose := func() { - elog := trace.NewEventLog("Dgraph", "ProcessOracleStream") - defer elog.Finish() glog.Infof("Leader idx=%d of group=%d is connecting to Zero for txn updates\n", g.Node.Id, g.groupId()) pl := g.Leader(0) if pl == nil { glog.Warningln("Oracle delta stream: No Zero leader known.") - elog.Errorf("Dgraph zero leader address unknown") time.Sleep(time.Second) return } @@ -826,7 +808,6 @@ func (g *groupi) processOracleDeltaStream() { stream, err := c.Oracle(ctx, &api.Payload{}) if err != nil { glog.Errorf("Error while calling Oracle %v\n", err) - elog.Errorf("Error while calling Oracle %v", err) time.Sleep(time.Second) return } @@ -907,9 +888,9 @@ func (g *groupi) processOracleDeltaStream() { sort.Slice(delta.Txns, func(i, j int) bool { return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs }) - elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) if glog.V(2) { - glog.Infof("Batched %d updates. Max Assigned: %d. Proposing Deltas:", batch, delta.MaxAssigned) + glog.Infof("Batched %d updates. Max Assigned: %d. Proposing Deltas:", + batch, delta.MaxAssigned) for _, txn := range delta.Txns { if txn.CommitTs == 0 { glog.Infof("Aborted: %d", txn.StartTs) diff --git a/worker/task.go b/worker/task.go index 9e2aca9f0a1..f95bff251ab 100644 --- a/worker/task.go +++ b/worker/task.go @@ -598,7 +598,7 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro } if span != nil { maxAssigned := posting.Oracle().MaxAssigned() - span.Annotatef(nil, "Done waiting for maxAssigned to catch up for Attr %q, readTs: %d max: %d\n", + span.Annotatef(nil, "Done waiting for maxAssigned. Attr: %q ReadTs: %d Max: %d", q.Attr, q.ReadTs, maxAssigned) } From e60bc7abfe91eb855d92792c2328258aa91ace50 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 20 Nov 2018 14:35:58 -0800 Subject: [PATCH 12/12] Self Review --- dgraph/cmd/zero/oracle.go | 4 ++++ dgraph/cmd/zero/raft.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index b6095acf065..17e738f517b 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -287,6 +287,10 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { // through. Otherwise, we should block here forever. If we don't do this, we'll see txn // violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would // cause newer txns to see older data. + + // We could consider adding a wrapper around the user proposal, so we can access any key-values. + // Something like this: + // https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4 ctx = context.Background() // Use a new context with no timeout. // If this node stops being the leader, we want this proposal to not be forwarded to the leader, diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 0f5d39f29c8..cbc524f2efc 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -87,6 +87,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er defer n.Proposals.Delete(key) proposal.Key = key + // TODO: Remove this and use OpenCensus spans. if tr, ok := trace.FromContext(ctx); ok { tr.LazyPrintf("Proposing with key: %X", key) } @@ -529,7 +530,6 @@ func (n *node) Run() { n.Applied.Done(entry.Index) } - // TODO: Should we move this to the top? if rd.SoftState != nil { if rd.RaftState == raft.StateLeader && !leader { glog.Infoln("I've become the leader, updating leases.")