diff --git a/conn/pool.go b/conn/pool.go index 3c876bbedfb..31c41743935 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + "go.opencensus.io/plugin/ocgrpc" "google.golang.org/grpc" ) @@ -124,6 +125,7 @@ func (p *Pools) Connect(addr string) *Pool { // NewPool creates a new "pool" with one gRPC connection, refcount 0. func NewPool(addr string) (*Pool, error) { conn, err := grpc.Dial(addr, + grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(x.GrpcMaxSize), grpc.MaxCallSendMsgSize(x.GrpcMaxSize)), diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index b604998fd86..277f7bc7278 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -44,8 +44,8 @@ import ( "github.com/spf13/cobra" "go.opencensus.io/exporter/jaeger" "go.opencensus.io/plugin/ocgrpc" - "go.opencensus.io/stats/view" otrace "go.opencensus.io/trace" + "go.opencensus.io/zpages" "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc" @@ -92,7 +92,8 @@ they form a Raft group and provide synchronous replication. "[mmap, disk] Specifies how Badger Value log is stored."+ " mmap consumes more RAM, but provides better performance.") - flag.String("jaeger.agent", "", "Send opencensus traces to Jaeger.") + // OpenCensus flags. + flag.Float64("trace", 1.0, "The ratio of queries to trace.") flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") flag.StringP("wal", "w", "w", "Directory to store raft write-ahead logs.") @@ -103,7 +104,6 @@ they form a Raft group and provide synchronous replication. flag.String("export", "export", "Folder in which to store exports.") flag.Int("pending_proposals", 256, "Number of pending mutation proposals. Useful for rate limiting.") - flag.Float64("trace", 0.0, "The ratio of queries to trace.") flag.String("my", "", "IP_ADDRESS:PORT of this Dgraph Alpha, so other Dgraph Alphas can talk to this.") flag.StringP("zero", "z", fmt.Sprintf("localhost:%d", x.PortZeroGrpc), @@ -242,36 +242,13 @@ func setupListener(addr string, port int, reload func()) (net.Listener, error) { func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) { defer wg.Done() - if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - glog.Fatalf("Unable to register opencensus: %v", err) - } - - handler := &ocgrpc.ServerHandler{ - IsPublicEndpoint: true, - StartOptions: otrace.StartOptions{ - Sampler: otrace.AlwaysSample(), - }, - } - opt := []grpc.ServerOption{ - grpc.MaxRecvMsgSize(x.GrpcMaxSize), - grpc.MaxSendMsgSize(x.GrpcMaxSize), - grpc.MaxConcurrentStreams(1000), - grpc.StatsHandler(handler), - } - if tlsCfg != nil { - opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg))) - } - if agent := Alpha.Conf.GetString("jaeger.agent"); len(agent) > 0 { + if collector := Alpha.Conf.GetString("jaeger.collector"); len(collector) > 0 { // Port details: https://www.jaegertracing.io/docs/getting-started/ - // Default endpoints are: - // agentEndpointURI := "localhost:6831" - // collectorEndpointURI := "http://localhost:14268" - collector := Alpha.Conf.GetString("jaeger.collector") + // Default collectorEndpointURI := "http://localhost:14268" je, err := jaeger.NewExporter(jaeger.Options{ - AgentEndpoint: agent, - Endpoint: collector, - ServiceName: "dgraph.alpha", + Endpoint: collector, + ServiceName: "dgraph.alpha", }) if err != nil { log.Fatalf("Failed to create the Jaeger exporter: %v", err) @@ -279,6 +256,21 @@ func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) { // And now finally register it as a Trace Exporter otrace.RegisterExporter(je) } + // Exclusively for stats, metrics, etc. Not for tracing. + // var views = append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...) + // if err := view.Register(views...); err != nil { + // glog.Fatalf("Unable to register OpenCensus stats: %v", err) + // } + + opt := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(x.GrpcMaxSize), + grpc.MaxSendMsgSize(x.GrpcMaxSize), + grpc.MaxConcurrentStreams(1000), + grpc.StatsHandler(&ocgrpc.ServerHandler{}), + } + if tlsCfg != nil { + opt = append(opt, grpc.Creds(credentials.NewTLS(tlsCfg))) + } s := grpc.NewServer(opt...) api.RegisterDgraphServer(s, &edgraph.Server{}) @@ -350,12 +342,18 @@ func setupServer() { http.HandleFunc("/alter", alterHandler) http.HandleFunc("/health", healthCheck) http.HandleFunc("/share", shareHandler) + + // TODO: Figure out what this is for? http.HandleFunc("/debug/store", storeStatsHandler) + http.HandleFunc("/admin/shutdown", shutDownHandler) http.HandleFunc("/admin/backup", backupHandler) http.HandleFunc("/admin/export", exportHandler) http.HandleFunc("/admin/config/lru_mb", memoryLimitHandler) + // Add OpenCensus z-pages. + zpages.Handle(http.DefaultServeMux, "/z") + http.HandleFunc("/", homeHandler) http.HandleFunc("/ui/keywords", keywordHandler) @@ -430,6 +428,8 @@ func run() { return true, true } } + otrace.ApplyConfig(otrace.Config{ + DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)}) // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 79fb7e9c83c..848ce2f6b1e 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -70,6 +70,21 @@ func init() { flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.") } +func toInt(o *pb.Posting) int { + from := types.Val{ + Tid: types.TypeID(o.ValType), + Value: o.Value, + } + out, err := types.Convert(from, types.StringID) + x.Check(err) + val := out.Value.(string) + a, err := strconv.Atoi(val) + if err != nil { + return 0 + } + return a +} + func readAmount(txn *badger.Txn, uid uint64) int { iopt := badger.DefaultIteratorOptions iopt.AllVersions = true @@ -86,16 +101,7 @@ func readAmount(txn *badger.Txn, uid uint64) int { var times int var amount int err = pl.Iterate(math.MaxUint64, 0, func(o *pb.Posting) error { - from := types.Val{ - Tid: types.TypeID(o.ValType), - Value: o.Value, - } - out, err := types.Convert(from, types.StringID) - x.Check(err) - val := out.Value.(string) - a, err := strconv.Atoi(val) - x.Check(err) - amount = a + amount = toInt(o) times++ return nil }) @@ -155,7 +161,7 @@ func seekTotal(db *badger.DB, readTs uint64) int { var total int for uid, key := range keys { a := readAmount(txn, uid) - fmt.Printf("uid: %-5d key: %d amount: %d\n", uid, key, a) + fmt.Printf("uid: %-5d %x key: %d amount: %d\n", uid, uid, key, a) total += a } fmt.Printf("Total @ %d = %d\n", readTs, total) @@ -192,6 +198,12 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) { itr := txn.NewIterator(badger.DefaultIteratorOptions) defer itr.Close() + type account struct { + Key int + Amt int + } + keys := make(map[uint64]*account) + var buf bytes.Buffer fmt.Fprintf(&buf, "SHOWING all postings at %d\n", readTs) for itr.Rewind(); itr.Valid(); itr.Next() { @@ -204,15 +216,39 @@ func showAllPostingsAt(db *badger.DB, readTs uint64) { if !pk.IsData() || pk.Attr == "_predicate_" { continue } - fmt.Fprintf(&buf, "\nkey: %+v hex: %x\n", pk, item.Key()) + + var acc *account + if pk.Attr == "key_0" || pk.Attr == "amount_0" { + var has bool + acc, has = keys[pk.Uid] + if !has { + acc = &account{} + keys[pk.Uid] = acc + } + } + fmt.Fprintf(&buf, " key: %+v hex: %x\n", pk, item.Key()) val, err := item.ValueCopy(nil) x.Check(err) var plist pb.PostingList x.Check(plist.Unmarshal(val)) + x.AssertTrue(len(plist.Postings) <= 1) + var num int for _, p := range plist.Postings { + num = toInt(p) appendPosting(&buf, p) } + if num > 0 && acc != nil { + switch pk.Attr { + case "key_0": + acc.Key = num + case "amount_0": + acc.Amt = num + } + } + } + for uid, acc := range keys { + fmt.Fprintf(&buf, "Uid: %d %x Key: %d Amount: %d\n", uid, uid, acc.Key, acc.Amt) } fmt.Println(buf.String()) } diff --git a/dgraph/cmd/zero/assign.go b/dgraph/cmd/zero/assign.go index 40a4c691731..e2360a151ca 100644 --- a/dgraph/cmd/zero/assign.go +++ b/dgraph/cmd/zero/assign.go @@ -41,6 +41,7 @@ func (s *Server) updateLeases() { s.nextLeaseId = s.state.MaxLeaseId + 1 s.nextTxnTs = s.state.MaxTxnTs + 1 startTs = s.nextTxnTs + glog.Infof("Updated Lease id: %d. Txn Ts: %d", s.nextLeaseId, s.nextTxnTs) s.Unlock() s.orc.updateStartTxnTs(startTs) } diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index d71fff1139f..17e738f517b 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -26,6 +26,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/golang/glog" + otrace "go.opencensus.io/trace" "golang.org/x/net/context" ) @@ -67,6 +68,7 @@ func (o *Oracle) updateStartTxnTs(ts uint64) { o.keyCommit = make(map[string]uint64) } +// TODO: This should be done during proposal application for Txn status. func (o *Oracle) hasConflict(src *api.TxnContext) bool { // This transaction was started before I became leader. if src.StartTs < o.startTxnTs { @@ -84,6 +86,10 @@ func (o *Oracle) purgeBelow(minTs uint64) { o.Lock() defer o.Unlock() + // TODO: HACK. Remove this later. + glog.Infof("Not purging below: %d", minTs) + return + // Dropping would be cheaper if abort/commits map is sharded for ts := range o.commits { if ts < minTs { @@ -197,6 +203,7 @@ func (o *Oracle) sendDeltasToSubscribers() { // Don't goto slurp_loop, because it would break from select immediately. } + glog.V(2).Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta) o.Lock() for id, ch := range o.subscribers { select { @@ -273,6 +280,21 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { CommitTs: src.CommitTs, Aborted: src.Aborted, } + + // NOTE: It is important that we continue retrying proposeTxn until we succeed. This should + // happen, irrespective of what the user context timeout might be. We check for it before + // reaching this stage, but now that we're here, we have to ensure that the commit proposal goes + // through. Otherwise, we should block here forever. If we don't do this, we'll see txn + // violations in Jepsen, because we'll send out a MaxAssigned higher than a commit, which would + // cause newer txns to see older data. + + // We could consider adding a wrapper around the user proposal, so we can access any key-values. + // Something like this: + // https://github.com/golang/go/commit/5d39260079b5170e6b4263adb4022cc4b54153c4 + ctx = context.Background() // Use a new context with no timeout. + + // If this node stops being the leader, we want this proposal to not be forwarded to the leader, + // and get aborted. if err := s.Node.proposeAndWait(ctx, &zp); err != nil { return err } @@ -292,6 +314,8 @@ func (s *Server) proposeTxn(ctx context.Context, src *api.TxnContext) error { } func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { + span := otrace.FromContext(ctx) + span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(src.StartTs))}, "") if src.Aborted { return s.proposeTxn(ctx, src) } @@ -301,6 +325,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { conflict := s.orc.hasConflict(src) s.orc.RUnlock() if conflict { + span.Annotate(nil, "Oracle found conflict") src.Aborted = true return s.proposeTxn(ctx, src) } @@ -328,6 +353,7 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { for pred := range preds { tablet := s.ServingTablet(pred) if tablet == nil || tablet.GetReadOnly() { + span.Annotate(nil, "Tablet is readonly. Aborting.") src.Aborted = true return s.proposeTxn(ctx, src) } @@ -341,8 +367,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error { src.CommitTs = assigned.StartId // Mark the transaction as done, irrespective of whether the proposal succeeded or not. defer s.orc.doneUntil.Done(src.CommitTs) + span.Annotatef([]otrace.Attribute{otrace.Int64Attribute("commitTs", int64(src.CommitTs))}, + "Node Id: %d. Proposing TxnContext: %+v", s.Node.Id, src) if err := s.orc.commit(src); err != nil { + span.Annotatef(nil, "Found a conflict. Aborting.") + src.Aborted = true + } + if err := ctx.Err(); err != nil { + span.Annotatef(nil, "Aborting txn due to context timing out.") src.Aborted = true } // Propose txn should be used to set watermark as done. @@ -353,6 +386,9 @@ func (s *Server) CommitOrAbort(ctx context.Context, src *api.TxnContext) (*api.T if ctx.Err() != nil { return nil, ctx.Err() } + ctx, span := otrace.StartSpan(ctx, "Zero.CommitOrAbort") + defer span.End() + if !s.Node.AmLeader() { return nil, x.Errorf("Only leader can decide to commit or abort") } diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index e0071edc60e..cbc524f2efc 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -70,6 +70,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er } propose := func(timeout time.Duration) error { + if !n.AmLeader() { + return x.Errorf("Not Zero leader. Aborting proposal: %+v", proposal) + } cctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -84,6 +87,7 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er defer n.Proposals.Delete(key) proposal.Key = key + // TODO: Remove this and use OpenCensus spans. if tr, ok := trace.FromContext(ctx); ok { tr.LazyPrintf("Proposing with key: %X", key) } @@ -119,6 +123,9 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.ZeroProposal) er for err == errInternalRetry { err = propose(timeout) timeout *= 2 // Exponential backoff + if timeout > time.Minute { + timeout = 32 * time.Second + } } return err } @@ -523,9 +530,9 @@ func (n *node) Run() { n.Applied.Done(entry.Index) } - // TODO: Should we move this to the top? if rd.SoftState != nil { if rd.RaftState == raft.StateLeader && !leader { + glog.Infoln("I've become the leader, updating leases.") n.server.updateLeases() } leader = rd.RaftState == raft.StateLeader diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 1da356bfde6..40fa14ef0fe 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -27,6 +27,10 @@ import ( "syscall" "time" + "go.opencensus.io/exporter/jaeger" + "go.opencensus.io/plugin/ocgrpc" + otrace "go.opencensus.io/trace" + "go.opencensus.io/zpages" "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc" @@ -83,6 +87,10 @@ instances to achieve high-availability. flag.StringP("wal", "w", "zw", "Directory storing WAL.") flag.Duration("rebalance_interval", 8*time.Minute, "Interval for trying a predicate move.") flag.Bool("telemetry", true, "Send anonymous telemetry data to Dgraph devs.") + + // OpenCensus flags. + flag.Float64("trace", 1.0, "The ratio of queries to trace.") + flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.") } func setupListener(addr string, port int, kind string) (listener net.Listener, err error) { @@ -98,13 +106,37 @@ type state struct { } func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.DiskStorage) { + if collector := Zero.Conf.GetString("jaeger.collector"); len(collector) > 0 { + // Port details: https://www.jaegertracing.io/docs/getting-started/ + // Default collectorEndpointURI := "http://localhost:14268" + je, err := jaeger.NewExporter(jaeger.Options{ + Endpoint: collector, + ServiceName: "dgraph.zero", + }) + if err != nil { + log.Fatalf("Failed to create the Jaeger exporter: %v", err) + } + // And now finally register it as a Trace Exporter + otrace.RegisterExporter(je) + } + // Exclusively for stats, metrics, etc. Not for tracing. + // var views = append(ocgrpc.DefaultServerViews, ocgrpc.DefaultClientViews...) + // if err := view.Register(views...); err != nil { + // glog.Fatalf("Unable to register OpenCensus stats: %v", err) + // } + s := grpc.NewServer( grpc.MaxRecvMsgSize(x.GrpcMaxSize), grpc.MaxSendMsgSize(x.GrpcMaxSize), - grpc.MaxConcurrentStreams(1000)) + grpc.MaxConcurrentStreams(1000), + grpc.StatsHandler(&ocgrpc.ServerHandler{})) rc := pb.RaftContext{Id: opts.nodeId, Addr: opts.myAddr, Group: 0} m := conn.NewNode(&rc, store) + + // Zero followers should not be forwarding proposals to the leader, to avoid txn commits which + // were calculated in a previous Zero leader. + m.Cfg.DisableProposalForwarding = true st.rs = &conn.RaftServer{Node: m} st.node = &node{Node: m, ctx: context.Background(), stop: make(chan struct{})} @@ -163,6 +195,8 @@ func run() { } } grpc.EnableTracing = false + otrace.ApplyConfig(otrace.Config{ + DefaultSampler: otrace.ProbabilitySampler(Zero.Conf.GetFloat64("trace"))}) addr := "localhost" if opts.bindall { @@ -204,6 +238,7 @@ func run() { http.HandleFunc("/removeNode", st.removeNode) http.HandleFunc("/moveTablet", st.moveTablet) http.HandleFunc("/assignIds", st.assignUids) + zpages.Handle(http.DefaultServeMux, "/z") // This must be here. It does not work if placed before Grpc init. x.Check(st.node.initAndStartNode()) diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index 3ea87c39c79..5f38e3cc990 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -247,11 +247,7 @@ func (s *Server) updateZeroLeader() { defer s.Unlock() leader := s.Node.Raft().Status().Lead for _, m := range s.state.Zeros { - if m.Id == leader { - m.Leader = true - } else { - m.Leader = false - } + m.Leader = m.Id == leader } } diff --git a/edgraph/server.go b/edgraph/server.go index 1f1fd6f8cc9..a96da8e5afb 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -63,7 +63,6 @@ type ServerState struct { needTs chan tsReq } -// TODO(tzdybal) - remove global var State ServerState func InitServerState() { @@ -338,6 +337,12 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign return resp, err } + if len(mu.SetJson) > 0 { + span.Annotatef(nil, "Got JSON Mutation: %s", mu.SetJson) + } else if len(mu.SetNquads) > 0 { + span.Annotatef(nil, "Got NQuad Mutation: %s", mu.SetNquads) + } + if !isMutationAllowed(ctx) { return nil, x.Errorf("No mutations allowed.") } @@ -383,8 +388,9 @@ func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (resp *api.Assign Edges: edges, StartTs: mu.StartTs, } - span.Annotate(nil, "Applying mutations") + span.Annotatef(nil, "Applying mutations: %+v", m) resp.Context, err = query.ApplyMutations(ctx, m) + span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Context, err) if !mu.CommitNow { if err == y.ErrConflict { err = status.Error(codes.FailedPrecondition, err.Error()) @@ -494,6 +500,7 @@ func (s *Server) Query(ctx context.Context, req *api.Request) (resp *api.Respons return resp, err } resp.Json = json + span.Annotatef(nil, "Response = %s", json) gl := &api.Latency{ ParsingNs: uint64(l.Parsing.Nanoseconds()), @@ -521,6 +528,8 @@ func (s *Server) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.Tx return &api.TxnContext{}, fmt.Errorf("StartTs cannot be zero while committing a transaction.") } annotateStartTs(span, tc.StartTs) + + span.Annotatef(nil, "Txn Context received: %+v", tc) commitTs, err := worker.CommitOverNetwork(ctx, tc) if err == y.ErrAborted { tctx.Aborted = true diff --git a/vendor/github.com/coreos/etcd/raft/README.md b/vendor/github.com/coreos/etcd/raft/README.md index f485b839771..fde22b16519 100644 --- a/vendor/github.com/coreos/etcd/raft/README.md +++ b/vendor/github.com/coreos/etcd/raft/README.md @@ -25,12 +25,12 @@ This raft implementation is a full feature implementation of Raft protocol. Feat - Membership changes - Leadership transfer extension - Efficient linearizable read-only queries served by both the leader and followers - - leader checks with quorum and bypasses Raft log before processing read-only queries - - followers asks leader to get a safe read index before processing read-only queries + - leader checks with quorum and bypasses Raft log before processing read-only queries + - followers asks leader to get a safe read index before processing read-only queries - More efficient lease-based linearizable read-only queries served by both the leader and followers - - leader bypasses Raft log and processing read-only queries locally - - followers asks leader to get a safe read index before processing read-only queries - - this approach relies on the clock of the all the machines in raft group + - leader bypasses Raft log and processing read-only queries locally + - followers asks leader to get a safe read index before processing read-only queries + - this approach relies on the clock of the all the machines in raft group This raft implementation also includes a few optional enhancements: @@ -112,7 +112,7 @@ After creating a Node, the user has a few responsibilities: First, read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2. -1. Write HardState, Entries, and Snapshot to persistent storage if they are not empty. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded. +1. Write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic writes then all of them can be written together. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded. 2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). To reduce the I/O latency, an optimization can be applied to make leader write to disk in parallel with its followers (as explained at section 10.2.1 in Raft thesis). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large). Note: Marshalling messages is not thread-safe; it is important to make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialise the messages directly inside the main raft loop. diff --git a/vendor/github.com/coreos/etcd/raft/node.go b/vendor/github.com/coreos/etcd/raft/node.go index 5da1c1193b2..33a9db84001 100644 --- a/vendor/github.com/coreos/etcd/raft/node.go +++ b/vendor/github.com/coreos/etcd/raft/node.go @@ -15,10 +15,10 @@ package raft import ( + "context" "errors" pb "github.com/coreos/etcd/raft/raftpb" - "golang.org/x/net/context" ) type SnapshotStatus int @@ -319,7 +319,7 @@ func (n *node) run(r *raft) { r.Step(m) case m := <-n.recvc: // filter out response message from unknown From. - if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) { + if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) // raft never returns an error } case cc := <-n.confc: @@ -334,6 +334,8 @@ func (n *node) run(r *raft) { switch cc.Type { case pb.ConfChangeAddNode: r.addNode(cc.NodeID) + case pb.ConfChangeAddLearnerNode: + r.addLearner(cc.NodeID) case pb.ConfChangeRemoveNode: // block incoming proposal when local node is // removed diff --git a/vendor/github.com/coreos/etcd/raft/progress.go b/vendor/github.com/coreos/etcd/raft/progress.go index 77c7b52efe3..ef3787db65d 100644 --- a/vendor/github.com/coreos/etcd/raft/progress.go +++ b/vendor/github.com/coreos/etcd/raft/progress.go @@ -48,6 +48,7 @@ type Progress struct { // When in ProgressStateSnapshot, leader should have sent out snapshot // before and stops sending any replication message. State ProgressStateType + // Paused is used in ProgressStateProbe. // When Paused is true, raft should pause sending replication message to this peer. Paused bool @@ -76,6 +77,9 @@ type Progress struct { // be freed by calling inflights.freeTo with the index of the last // received entry. ins *inflights + + // IsLearner is true if this progress is tracked for a learner. + IsLearner bool } func (pr *Progress) resetState(state ProgressStateType) { @@ -243,7 +247,8 @@ func (in *inflights) freeTo(to uint64) { return } - i, idx := 0, in.start + idx := in.start + var i int for i = 0; i < in.count; i++ { if to < in.buffer[idx] { // found the first large inflight break diff --git a/vendor/github.com/coreos/etcd/raft/raft.go b/vendor/github.com/coreos/etcd/raft/raft.go index 29f20398203..b4c0f0248ca 100644 --- a/vendor/github.com/coreos/etcd/raft/raft.go +++ b/vendor/github.com/coreos/etcd/raft/raft.go @@ -116,6 +116,10 @@ type Config struct { // used for testing right now. peers []uint64 + // learners contains the IDs of all leaner nodes (including self if the local node is a leaner) in the raft cluster. + // learners only receives entries from the leader node. It does not vote or promote itself. + learners []uint64 + // ElectionTick is the number of Node.Tick invocations that must pass between // elections. That is, if a follower does not receive any message from the // leader of current term before ElectionTick has elapsed, it will become @@ -171,11 +175,22 @@ type Config struct { // If the clock drift is unbounded, leader might keep the lease longer than it // should (clock can move backward/pause without any bound). ReadIndex is not safe // in that case. + // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. ReadOnlyOption ReadOnlyOption // Logger is the logger used for raft log. For multinode which can host // multiple raft group, each raft group can have its own logger Logger Logger + + // DisableProposalForwarding set to true means that followers will drop + // proposals, rather than forwarding them to the leader. One use case for + // this feature would be in a situation where the Raft leader is used to + // compute the data of a proposal, for example, adding a timestamp from a + // hybrid logical clock to data in a monotonically increasing way. Forwarding + // should be disabled to prevent a follower with an innaccurate hybrid + // logical clock from assigning the timestamp and then forwarding the data + // to the leader. + DisableProposalForwarding bool } func (c *Config) validate() error { @@ -203,6 +218,10 @@ func (c *Config) validate() error { c.Logger = raftLogger } + if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum { + return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased") + } + return nil } @@ -220,9 +239,13 @@ type raft struct { maxInflight int maxMsgSize uint64 prs map[uint64]*Progress + learnerPrs map[uint64]*Progress state StateType + // isLearner is true if the local raft node is a learner. + isLearner bool + votes map[uint64]bool msgs []pb.Message @@ -256,6 +279,7 @@ type raft struct { // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. randomizedElectionTimeout int + disableProposalForwarding bool tick func() step stepFunc @@ -273,32 +297,47 @@ func newRaft(c *Config) *raft { panic(err) // TODO(bdarnell) } peers := c.peers - if len(cs.Nodes) > 0 { - if len(peers) > 0 { + learners := c.learners + if len(cs.Nodes) > 0 || len(cs.Learners) > 0 { + if len(peers) > 0 || len(learners) > 0 { // TODO(bdarnell): the peers argument is always nil except in // tests; the argument should be removed and these tests should be // updated to specify their nodes through a snapshot. - panic("cannot specify both newRaft(peers) and ConfState.Nodes)") + panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)") } peers = cs.Nodes + learners = cs.Learners } r := &raft{ - id: c.ID, - lead: None, - raftLog: raftlog, - maxMsgSize: c.MaxSizePerMsg, - maxInflight: c.MaxInflightMsgs, - prs: make(map[uint64]*Progress), - electionTimeout: c.ElectionTick, - heartbeatTimeout: c.HeartbeatTick, - logger: c.Logger, - checkQuorum: c.CheckQuorum, - preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), + id: c.ID, + lead: None, + isLearner: false, + raftLog: raftlog, + maxMsgSize: c.MaxSizePerMsg, + maxInflight: c.MaxInflightMsgs, + prs: make(map[uint64]*Progress), + learnerPrs: make(map[uint64]*Progress), + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, + checkQuorum: c.CheckQuorum, + preVote: c.PreVote, + readOnly: newReadOnly(c.ReadOnlyOption), + disableProposalForwarding: c.DisableProposalForwarding, } for _, p := range peers { r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} } + for _, p := range learners { + if _, ok := r.prs[p]; ok { + panic(fmt.Sprintf("node %x is in both learner and peer list", p)) + } + r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} + if r.id == p { + r.isLearner = true + } + } + if !isHardStateEqual(hs, emptyState) { r.loadState(hs) } @@ -332,10 +371,13 @@ func (r *raft) hardState() pb.HardState { func (r *raft) quorum() int { return len(r.prs)/2 + 1 } func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs)) + nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs)) for id := range r.prs { nodes = append(nodes, id) } + for id := range r.learnerPrs { + nodes = append(nodes, id) + } sort.Sort(uint64Slice(nodes)) return nodes } @@ -343,10 +385,20 @@ func (r *raft) nodes() []uint64 { // send persists state to stable storage and then sends to its mailbox. func (r *raft) send(m pb.Message) { m.From = r.id - if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { + if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp { if m.Term == 0 { - // PreVote RPCs are sent at a term other than our actual term, so the code - // that sends these messages is responsible for setting the term. + // All {pre-,}campaign messages need to have the term set when + // sending. + // - MsgVote: m.Term is the term the node is campaigning for, + // non-zero as we increment the term when campaigning. + // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was + // granted, non-zero for the same reason MsgVote is + // - MsgPreVote: m.Term is the term the node will campaign, + // non-zero as we use m.Term to indicate the next term we'll be + // campaigning for + // - MsgPreVoteResp: m.Term is the term received in the original + // MsgPreVote if the pre-vote was granted, non-zero for the + // same reasons MsgPreVote is panic(fmt.Sprintf("term should be set when sending %s", m.Type)) } } else { @@ -364,9 +416,17 @@ func (r *raft) send(m pb.Message) { r.msgs = append(r.msgs, m) } +func (r *raft) getProgress(id uint64) *Progress { + if pr, ok := r.prs[id]; ok { + return pr + } + + return r.learnerPrs[id] +} + // sendAppend sends RPC, with entries to the given peer. func (r *raft) sendAppend(to uint64) { - pr := r.prs[to] + pr := r.getProgress(to) if pr.IsPaused() { return } @@ -431,7 +491,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.prs[to].Match, r.raftLog.committed) + commit := min(r.getProgress(to).Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, @@ -442,15 +502,26 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { r.send(m) } +func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) { + for id, pr := range r.prs { + f(id, pr) + } + + for id, pr := range r.learnerPrs { + f(id, pr) + } +} + // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.prs. func (r *raft) bcastAppend() { - for id := range r.prs { + r.forEachProgress(func(id uint64, _ *Progress) { if id == r.id { - continue + return } + r.sendAppend(id) - } + }) } // bcastHeartbeat sends RPC, without entries to all the peers. @@ -464,12 +535,12 @@ func (r *raft) bcastHeartbeat() { } func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { - for id := range r.prs { + r.forEachProgress(func(id uint64, _ *Progress) { if id == r.id { - continue + return } r.sendHeartbeat(id, ctx) - } + }) } // maybeCommit attempts to advance the commit index. Returns true if @@ -478,8 +549,8 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { func (r *raft) maybeCommit() bool { // TODO(bmizerany): optimize.. Currently naive mis := make(uint64Slice, 0, len(r.prs)) - for id := range r.prs { - mis = append(mis, r.prs[id].Match) + for _, p := range r.prs { + mis = append(mis, p.Match) } sort.Sort(sort.Reverse(mis)) mci := mis[r.quorum()-1] @@ -500,12 +571,13 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() r.votes = make(map[uint64]bool) - for id := range r.prs { - r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)} + r.forEachProgress(func(id uint64, pr *Progress) { + *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner} if id == r.id { - r.prs[id].Match = r.raftLog.lastIndex() + pr.Match = r.raftLog.lastIndex() } - } + }) + r.pendingConf = false r.readOnly = newReadOnly(r.readOnly.option) } @@ -517,7 +589,7 @@ func (r *raft) appendEntry(es ...pb.Entry) { es[i].Index = li + 1 + uint64(i) } r.raftLog.append(es...) - r.prs[r.id].maybeUpdate(r.raftLog.lastIndex()) + r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex()) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() } @@ -589,6 +661,7 @@ func (r *raft) becomePreCandidate() { // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. r.step = stepCandidate + r.votes = make(map[uint64]bool) r.tick = r.tickElection r.state = StatePreCandidate r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) @@ -682,7 +755,6 @@ func (r *raft) Step(m pb.Message) error { case m.Term == 0: // local message case m.Term > r.Term: - lead := m.From if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { force := bytes.Equal(m.Context, []byte(campaignTransfer)) inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout @@ -693,7 +765,6 @@ func (r *raft) Step(m pb.Message) error { r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) return nil } - lead = None } switch { case m.Type == pb.MsgPreVote: @@ -707,7 +778,11 @@ func (r *raft) Step(m pb.Message) error { default: r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]", r.id, r.Term, m.Type, m.From, m.Term) - r.becomeFollower(m.Term, lead) + if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { + r.becomeFollower(m.Term, m.From) + } else { + r.becomeFollower(m.Term, None) + } } case m.Term < r.Term: @@ -757,12 +832,27 @@ func (r *raft) Step(m pb.Message) error { } case pb.MsgVote, pb.MsgPreVote: + if r.isLearner { + // TODO: learner may need to vote, in case of node down when confchange. + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + return nil + } // The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should // always equal r.Term. if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) - r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)}) + // When responding to Msg{Pre,}Vote messages we include the term + // from the message, not the local term. To see why consider the + // case where a single node was previously partitioned away and + // it's local term is now of date. If we include the local term + // (recall that for pre-votes we don't update the local term), the + // (pre-)campaigning node on the other end will proceed to ignore + // the message (it ignores all out of date messages). + // The term in the original message and current local term are the + // same in the case of regular votes, but different for pre-votes. + r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) if m.Type == pb.MsgVote { // Only record real votes. r.electionElapsed = 0 @@ -771,7 +861,7 @@ func (r *raft) Step(m pb.Message) error { } else { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) - r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true}) + r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) } default: @@ -836,10 +926,7 @@ func stepLeader(r *raft, m pb.Message) { r.readOnly.addRequest(r.raftLog.committed, m) r.bcastHeartbeatWithCtx(m.Entries[0].Data) case ReadOnlyLeaseBased: - var ri uint64 - if r.checkQuorum { - ri = r.raftLog.committed - } + ri := r.raftLog.committed if m.From == None || m.From == r.id { // from local member r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } else { @@ -854,8 +941,8 @@ func stepLeader(r *raft, m pb.Message) { } // All other message types require a progress for m.From (pr). - pr, prOk := r.prs[m.From] - if !prOk { + pr := r.getProgress(m.From) + if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return } @@ -954,6 +1041,10 @@ func stepLeader(r *raft, m pb.Message) { } r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) case pb.MsgTransferLeader: + if pr.IsLearner { + r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) + return + } leadTransferee := m.From lastLeadTransferee := r.leadTransferee if lastLeadTransferee != None { @@ -1033,6 +1124,9 @@ func stepFollower(r *raft, m pb.Message) { if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return + } else if r.disableProposalForwarding { + r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) + return } m.To = r.lead r.send(m) @@ -1127,20 +1221,37 @@ func (r *raft) restore(s pb.Snapshot) bool { return false } + // The normal peer can't become learner. + if !r.isLearner { + for _, id := range s.Metadata.ConfState.Learners { + if id == r.id { + r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term) + return false + } + } + } + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) r.prs = make(map[uint64]*Progress) - for _, n := range s.Metadata.ConfState.Nodes { + r.learnerPrs = make(map[uint64]*Progress) + r.restoreNode(s.Metadata.ConfState.Nodes, false) + r.restoreNode(s.Metadata.ConfState.Learners, true) + return true +} + +func (r *raft) restoreNode(nodes []uint64, isLearner bool) { + for _, n := range nodes { match, next := uint64(0), r.raftLog.lastIndex()+1 if n == r.id { match = next - 1 + r.isLearner = isLearner } - r.setProgress(n, match, next) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n]) + r.setProgress(n, match, next, isLearner) + r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n)) } - return true } // promotable indicates whether state machine can be promoted to leader, @@ -1151,18 +1262,46 @@ func (r *raft) promotable() bool { } func (r *raft) addNode(id uint64) { + r.addNodeOrLearnerNode(id, false) +} + +func (r *raft) addLearner(id uint64) { + r.addNodeOrLearnerNode(id, true) +} + +func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { r.pendingConf = false - if _, ok := r.prs[id]; ok { - // Ignore any redundant addNode calls (which can happen because the - // initial bootstrapping entries are applied twice). - return + pr := r.getProgress(id) + if pr == nil { + r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + } else { + if isLearner && !pr.IsLearner { + // can only change Learner to Voter + r.logger.Infof("%x ignored addLeaner: do not support changing %x from raft peer to learner.", r.id, id) + return + } + + if isLearner == pr.IsLearner { + // Ignore any redundant addNode calls (which can happen because the + // initial bootstrapping entries are applied twice). + return + } + + // change Learner to Voter, use origin Learner progress + delete(r.learnerPrs, id) + pr.IsLearner = false + r.prs[id] = pr + } + + if r.id == id { + r.isLearner = isLearner } - r.setProgress(id, 0, r.raftLog.lastIndex()+1) // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked // before the added node has a chance to communicate with us. - r.prs[id].RecentActive = true + pr = r.getProgress(id) + pr.RecentActive = true } func (r *raft) removeNode(id uint64) { @@ -1170,7 +1309,7 @@ func (r *raft) removeNode(id uint64) { r.pendingConf = false // do not try to commit or abort transferring if there is no nodes in the cluster. - if len(r.prs) == 0 { + if len(r.prs) == 0 && len(r.learnerPrs) == 0 { return } @@ -1187,12 +1326,22 @@ func (r *raft) removeNode(id uint64) { func (r *raft) resetPendingConf() { r.pendingConf = false } -func (r *raft) setProgress(id, match, next uint64) { - r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} +func (r *raft) setProgress(id, match, next uint64, isLearner bool) { + if !isLearner { + delete(r.learnerPrs, id) + r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} + return + } + + if _, ok := r.prs[id]; ok { + panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id)) + } + r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true} } func (r *raft) delProgress(id uint64) { delete(r.prs, id) + delete(r.learnerPrs, id) } func (r *raft) loadState(state pb.HardState) { @@ -1222,18 +1371,18 @@ func (r *raft) resetRandomizedElectionTimeout() { func (r *raft) checkQuorumActive() bool { var act int - for id := range r.prs { + r.forEachProgress(func(id uint64, pr *Progress) { if id == r.id { // self is always active act++ - continue + return } - if r.prs[id].RecentActive { + if pr.RecentActive && !pr.IsLearner { act++ } - r.prs[id].RecentActive = false - } + pr.RecentActive = false + }) return act >= r.quorum() } diff --git a/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go b/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go index 4c6e79d58a0..fd9ee3729ec 100644 --- a/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go +++ b/vendor/github.com/coreos/etcd/raft/raftpb/raft.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-gogo. +// Code generated by protoc-gen-gogo. DO NOT EDIT. // source: raft.proto -// DO NOT EDIT! /* Package raftpb is a generated protocol buffer package. @@ -26,6 +25,8 @@ import ( math "math" + _ "github.com/gogo/protobuf/gogoproto" + io "io" ) @@ -162,20 +163,23 @@ func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, type ConfChangeType int32 const ( - ConfChangeAddNode ConfChangeType = 0 - ConfChangeRemoveNode ConfChangeType = 1 - ConfChangeUpdateNode ConfChangeType = 2 + ConfChangeAddNode ConfChangeType = 0 + ConfChangeRemoveNode ConfChangeType = 1 + ConfChangeUpdateNode ConfChangeType = 2 + ConfChangeAddLearnerNode ConfChangeType = 3 ) var ConfChangeType_name = map[int32]string{ 0: "ConfChangeAddNode", 1: "ConfChangeRemoveNode", 2: "ConfChangeUpdateNode", + 3: "ConfChangeAddLearnerNode", } var ConfChangeType_value = map[string]int32{ - "ConfChangeAddNode": 0, - "ConfChangeRemoveNode": 1, - "ConfChangeUpdateNode": 2, + "ConfChangeAddNode": 0, + "ConfChangeRemoveNode": 1, + "ConfChangeUpdateNode": 2, + "ConfChangeAddLearnerNode": 3, } func (x ConfChangeType) Enum() *ConfChangeType { @@ -267,6 +271,7 @@ func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []in type ConfState struct { Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"` + Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -537,6 +542,13 @@ func (m *ConfState) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintRaft(dAtA, i, uint64(num)) } } + if len(m.Learners) > 0 { + for _, num := range m.Learners { + dAtA[i] = 0x10 + i++ + i = encodeVarintRaft(dAtA, i, uint64(num)) + } + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -579,24 +591,6 @@ func (m *ConfChange) MarshalTo(dAtA []byte) (int, error) { return i, nil } -func encodeFixed64Raft(dAtA []byte, offset int, v uint64) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - dAtA[offset+4] = uint8(v >> 32) - dAtA[offset+5] = uint8(v >> 40) - dAtA[offset+6] = uint8(v >> 48) - dAtA[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32Raft(dAtA []byte, offset int, v uint32) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - return offset + 4 -} func encodeVarintRaft(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -700,6 +694,11 @@ func (m *ConfState) Size() (n int) { n += 1 + sovRaft(uint64(e)) } } + if len(m.Learners) > 0 { + for _, e := range m.Learners { + n += 1 + sovRaft(uint64(e)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1619,6 +1618,68 @@ func (m *ConfState) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType) } + case 2: + if wireType == 0 { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Learners = append(m.Learners, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + packedLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + for iNdEx < postIndex { + var v uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Learners = append(m.Learners, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType) + } default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) @@ -1888,55 +1949,56 @@ var ( func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) } var fileDescriptorRaft = []byte{ - // 790 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0xdb, 0x46, - 0x10, 0x16, 0x29, 0xea, 0x6f, 0x28, 0xcb, 0xab, 0xb5, 0x5a, 0x2c, 0x0c, 0x43, 0x55, 0x85, 0x1e, - 0x04, 0x17, 0x76, 0x5b, 0x1d, 0x7a, 0xe8, 0xcd, 0x96, 0x0a, 0x58, 0x40, 0x65, 0xb8, 0xb2, 0xdc, - 0x43, 0x83, 0x20, 0x58, 0x8b, 0x2b, 0x4a, 0x89, 0xc9, 0x25, 0x96, 0x2b, 0xc7, 0xbe, 0x04, 0x79, - 0x80, 0x3c, 0x40, 0x2e, 0x79, 0x1f, 0x1f, 0x0d, 0xe4, 0x1e, 0xc4, 0xce, 0x8b, 0x04, 0xbb, 0x5c, - 0x4a, 0x94, 0x74, 0xdb, 0xf9, 0xbe, 0xe1, 0xcc, 0x37, 0xdf, 0xce, 0x12, 0x40, 0xd0, 0xa9, 0x3c, - 0x8e, 0x04, 0x97, 0x1c, 0x17, 0xd5, 0x39, 0xba, 0xde, 0x6f, 0xf8, 0xdc, 0xe7, 0x1a, 0xfa, 0x4d, - 0x9d, 0x12, 0xb6, 0xfd, 0x0e, 0x0a, 0x7f, 0x87, 0x52, 0xdc, 0xe3, 0x5f, 0xc1, 0x19, 0xdf, 0x47, - 0x8c, 0x58, 0x2d, 0xab, 0x53, 0xeb, 0xd6, 0x8f, 0x93, 0xaf, 0x8e, 0x35, 0xa9, 0x88, 0x53, 0xe7, - 0xe1, 0xcb, 0x4f, 0xb9, 0x91, 0x4e, 0xc2, 0x04, 0x9c, 0x31, 0x13, 0x01, 0xb1, 0x5b, 0x56, 0xc7, - 0x59, 0x32, 0x4c, 0x04, 0x78, 0x1f, 0x0a, 0x83, 0xd0, 0x63, 0x77, 0x24, 0x9f, 0xa1, 0x12, 0x08, - 0x63, 0x70, 0xfa, 0x54, 0x52, 0xe2, 0xb4, 0xac, 0x4e, 0x75, 0xa4, 0xcf, 0xed, 0xf7, 0x16, 0xa0, - 0xcb, 0x90, 0x46, 0xf1, 0x8c, 0xcb, 0x21, 0x93, 0xd4, 0xa3, 0x92, 0xe2, 0x3f, 0x01, 0x26, 0x3c, - 0x9c, 0xbe, 0x8a, 0x25, 0x95, 0x89, 0x22, 0x77, 0xa5, 0xa8, 0xc7, 0xc3, 0xe9, 0xa5, 0x22, 0x4c, - 0xf1, 0xca, 0x24, 0x05, 0x54, 0xf3, 0xb9, 0x6e, 0x9e, 0xd5, 0x95, 0x40, 0x4a, 0xb2, 0x54, 0x92, - 0xb3, 0xba, 0x34, 0xd2, 0xfe, 0x1f, 0xca, 0xa9, 0x02, 0x25, 0x51, 0x29, 0xd0, 0x3d, 0xab, 0x23, - 0x7d, 0xc6, 0x7f, 0x41, 0x39, 0x30, 0xca, 0x74, 0x61, 0xb7, 0x4b, 0x52, 0x2d, 0x9b, 0xca, 0x4d, - 0xdd, 0x65, 0x7e, 0xfb, 0x53, 0x1e, 0x4a, 0x43, 0x16, 0xc7, 0xd4, 0x67, 0xf8, 0x08, 0x1c, 0xb9, - 0x72, 0x78, 0x2f, 0xad, 0x61, 0xe8, 0xac, 0xc7, 0x2a, 0x0d, 0x37, 0xc0, 0x96, 0x7c, 0x6d, 0x12, - 0x5b, 0x72, 0x35, 0xc6, 0x54, 0xf0, 0x8d, 0x31, 0x14, 0xb2, 0x1c, 0xd0, 0xd9, 0x1c, 0x10, 0x37, - 0xa1, 0x74, 0xc3, 0x7d, 0x7d, 0x61, 0x85, 0x0c, 0x99, 0x82, 0x2b, 0xdb, 0x8a, 0xdb, 0xb6, 0x1d, - 0x41, 0x89, 0x85, 0x52, 0xcc, 0x59, 0x4c, 0x4a, 0xad, 0x7c, 0xc7, 0xed, 0xee, 0xac, 0x6d, 0x46, - 0x5a, 0xca, 0xe4, 0xe0, 0x03, 0x28, 0x4e, 0x78, 0x10, 0xcc, 0x25, 0x29, 0x67, 0x6a, 0x19, 0x0c, - 0x77, 0xa1, 0x1c, 0x1b, 0xc7, 0x48, 0x45, 0x3b, 0x89, 0x36, 0x9d, 0x4c, 0x1d, 0x4c, 0xf3, 0x54, - 0x45, 0xc1, 0x5e, 0xb3, 0x89, 0x24, 0xd0, 0xb2, 0x3a, 0xe5, 0xb4, 0x62, 0x82, 0xe1, 0x5f, 0x00, - 0x92, 0xd3, 0xd9, 0x3c, 0x94, 0xc4, 0xcd, 0xf4, 0xcc, 0xe0, 0x98, 0x40, 0x69, 0xc2, 0x43, 0xc9, - 0xee, 0x24, 0xa9, 0xea, 0x8b, 0x4d, 0xc3, 0xf6, 0x4b, 0xa8, 0x9c, 0x51, 0xe1, 0x25, 0xeb, 0x93, - 0x3a, 0x68, 0x6d, 0x39, 0x48, 0xc0, 0xb9, 0xe5, 0x92, 0xad, 0xef, 0xbb, 0x42, 0x32, 0x03, 0xe7, - 0xb7, 0x07, 0x6e, 0xff, 0x0c, 0x95, 0xe5, 0xba, 0xe2, 0x06, 0x14, 0x42, 0xee, 0xb1, 0x98, 0x58, - 0xad, 0x7c, 0xc7, 0x19, 0x25, 0x41, 0xfb, 0x83, 0x05, 0xa0, 0x72, 0x7a, 0x33, 0x1a, 0xfa, 0xfa, - 0xd6, 0x07, 0xfd, 0x35, 0x05, 0xf6, 0xa0, 0x8f, 0x7f, 0x37, 0x8f, 0xd3, 0xd6, 0xab, 0xf3, 0x63, - 0xf6, 0x29, 0x24, 0xdf, 0x6d, 0xbd, 0xd0, 0x03, 0x28, 0x9e, 0x73, 0x8f, 0x0d, 0xfa, 0xeb, 0xba, - 0x12, 0x4c, 0x19, 0xd2, 0x33, 0x86, 0x24, 0x8f, 0x31, 0x0d, 0x0f, 0xff, 0x80, 0xca, 0xf2, 0xc9, - 0xe3, 0x5d, 0x70, 0x75, 0x70, 0xce, 0x45, 0x40, 0x6f, 0x50, 0x0e, 0xef, 0xc1, 0xae, 0x06, 0x56, - 0x8d, 0x91, 0x75, 0xf8, 0xd9, 0x06, 0x37, 0xb3, 0xc4, 0x18, 0xa0, 0x38, 0x8c, 0xfd, 0xb3, 0x45, - 0x84, 0x72, 0xd8, 0x85, 0xd2, 0x30, 0xf6, 0x4f, 0x19, 0x95, 0xc8, 0x32, 0xc1, 0x85, 0xe0, 0x11, - 0xb2, 0x4d, 0xd6, 0x49, 0x14, 0xa1, 0x3c, 0xae, 0x01, 0x24, 0xe7, 0x11, 0x8b, 0x23, 0xe4, 0x98, - 0xc4, 0xff, 0xb8, 0x64, 0xa8, 0xa0, 0x44, 0x98, 0x40, 0xb3, 0x45, 0xc3, 0xaa, 0x85, 0x41, 0x25, - 0x8c, 0xa0, 0xaa, 0x9a, 0x31, 0x2a, 0xe4, 0xb5, 0xea, 0x52, 0xc6, 0x0d, 0x40, 0x59, 0x44, 0x7f, - 0x54, 0xc1, 0x18, 0x6a, 0xc3, 0xd8, 0xbf, 0x0a, 0x05, 0xa3, 0x93, 0x19, 0xbd, 0xbe, 0x61, 0x08, - 0x70, 0x1d, 0x76, 0x4c, 0x21, 0x75, 0x41, 0x8b, 0x18, 0xb9, 0x26, 0xad, 0x37, 0x63, 0x93, 0x37, - 0xff, 0x2e, 0xb8, 0x58, 0x04, 0xa8, 0x8a, 0x7f, 0x80, 0xfa, 0x30, 0xf6, 0xc7, 0x82, 0x86, 0xf1, - 0x94, 0x89, 0x7f, 0x18, 0xf5, 0x98, 0x40, 0x3b, 0xe6, 0xeb, 0xf1, 0x3c, 0x60, 0x7c, 0x21, 0xcf, - 0xf9, 0x5b, 0x54, 0x33, 0x62, 0x46, 0x8c, 0x7a, 0xfa, 0x87, 0x87, 0x76, 0x8d, 0x98, 0x25, 0xa2, - 0xc5, 0x20, 0x33, 0xef, 0x85, 0x60, 0x7a, 0xc4, 0xba, 0xe9, 0x6a, 0x62, 0x9d, 0x83, 0x0f, 0x5f, - 0x40, 0x6d, 0xfd, 0x7a, 0x95, 0x8e, 0x15, 0x72, 0xe2, 0x79, 0xea, 0x2e, 0x51, 0x0e, 0x13, 0x68, - 0xac, 0xe0, 0x11, 0x0b, 0xf8, 0x2d, 0xd3, 0x8c, 0xb5, 0xce, 0x5c, 0x45, 0x1e, 0x95, 0x09, 0x63, - 0x9f, 0x92, 0x87, 0xa7, 0x66, 0xee, 0xf1, 0xa9, 0x99, 0x7b, 0x78, 0x6e, 0x5a, 0x8f, 0xcf, 0x4d, - 0xeb, 0xeb, 0x73, 0xd3, 0xfa, 0xf8, 0xad, 0x99, 0xfb, 0x1e, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x30, - 0x01, 0x41, 0x3a, 0x06, 0x00, 0x00, + // 815 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45, + 0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38, + 0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b, + 0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20, + 0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3, + 0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9, + 0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f, + 0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77, + 0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24, + 0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37, + 0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01, + 0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03, + 0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42, + 0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21, + 0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36, + 0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb, + 0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95, + 0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02, + 0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36, + 0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20, + 0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d, + 0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d, + 0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c, + 0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3, + 0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53, + 0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa, + 0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa, + 0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0, + 0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73, + 0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb, + 0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b, + 0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67, + 0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60, + 0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70, + 0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63, + 0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1, + 0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe, + 0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc, + 0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83, + 0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21, + 0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1, + 0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6, + 0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4, + 0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65, + 0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9, + 0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa, + 0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73, + 0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0, + 0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c, + 0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8, + 0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00, } diff --git a/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto b/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto index 806a43634fd..644ce7b8f2f 100644 --- a/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto +++ b/vendor/github.com/coreos/etcd/raft/raftpb/raft.proto @@ -76,13 +76,15 @@ message HardState { } message ConfState { - repeated uint64 nodes = 1; + repeated uint64 nodes = 1; + repeated uint64 learners = 2; } enum ConfChangeType { - ConfChangeAddNode = 0; - ConfChangeRemoveNode = 1; - ConfChangeUpdateNode = 2; + ConfChangeAddNode = 0; + ConfChangeRemoveNode = 1; + ConfChangeUpdateNode = 2; + ConfChangeAddLearnerNode = 3; } message ConfChange { diff --git a/vendor/github.com/coreos/etcd/raft/rawnode.go b/vendor/github.com/coreos/etcd/raft/rawnode.go index b950d5169a5..925cb851c4a 100644 --- a/vendor/github.com/coreos/etcd/raft/rawnode.go +++ b/vendor/github.com/coreos/etcd/raft/rawnode.go @@ -175,6 +175,8 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { switch cc.Type { case pb.ConfChangeAddNode: rn.raft.addNode(cc.NodeID) + case pb.ConfChangeAddLearnerNode: + rn.raft.addLearner(cc.NodeID) case pb.ConfChangeRemoveNode: rn.raft.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: @@ -191,7 +193,7 @@ func (rn *RawNode) Step(m pb.Message) error { if IsLocalMsg(m.Type) { return ErrStepLocalMsg } - if _, ok := rn.raft.prs[m.From]; ok || !IsResponseMsg(m.Type) { + if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { return rn.raft.Step(m) } return ErrStepPeerNotFound diff --git a/vendor/github.com/coreos/etcd/raft/read_only.go b/vendor/github.com/coreos/etcd/raft/read_only.go index d0085237e36..ae746fa73eb 100644 --- a/vendor/github.com/coreos/etcd/raft/read_only.go +++ b/vendor/github.com/coreos/etcd/raft/read_only.go @@ -18,7 +18,7 @@ import pb "github.com/coreos/etcd/raft/raftpb" // ReadState provides state for read only query. // It's caller's responsibility to call ReadIndex first before getting -// this state from ready, It's also caller's duty to differentiate if this +// this state from ready, it's also caller's duty to differentiate if this // state is what it requests through RequestCtx, eg. given a unique id as // RequestCtx type ReadState struct { diff --git a/vendor/github.com/coreos/etcd/raft/status.go b/vendor/github.com/coreos/etcd/raft/status.go index b690fa56b95..f4d3d86a4e3 100644 --- a/vendor/github.com/coreos/etcd/raft/status.go +++ b/vendor/github.com/coreos/etcd/raft/status.go @@ -28,11 +28,17 @@ type Status struct { Applied uint64 Progress map[uint64]Progress + + LeadTransferee uint64 } // getStatus gets a copy of the current raft status. func getStatus(r *raft) Status { - s := Status{ID: r.id} + s := Status{ + ID: r.id, + LeadTransferee: r.leadTransferee, + } + s.HardState = r.hardState() s.SoftState = *r.softState() @@ -43,6 +49,10 @@ func getStatus(r *raft) Status { for id, p := range r.prs { s.Progress[id] = *p } + + for id, p := range r.learnerPrs { + s.Progress[id] = *p + } } return s @@ -51,19 +61,21 @@ func getStatus(r *raft) Status { // MarshalJSON translates the raft status into JSON. // TODO: try to simplify this by introducing ID type into raft func (s Status) MarshalJSON() ([]byte, error) { - j := fmt.Sprintf(`{"id":"%x","term":%d,"vote":"%x","commit":%d,"lead":"%x","raftState":%q,"progress":{`, - s.ID, s.Term, s.Vote, s.Commit, s.Lead, s.RaftState) + j := fmt.Sprintf(`{"id":"%x","term":%d,"vote":"%x","commit":%d,"lead":"%x","raftState":%q,"applied":%d,"progress":{`, + s.ID, s.Term, s.Vote, s.Commit, s.Lead, s.RaftState, s.Applied) if len(s.Progress) == 0 { - j += "}}" + j += "}," } else { for k, v := range s.Progress { subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"state":%q},`, k, v.Match, v.Next, v.State) j += subj } // remove the trailing "," - j = j[:len(j)-1] + "}}" + j = j[:len(j)-1] + "}," } + + j += fmt.Sprintf(`"leadtransferee":"%x"}`, s.LeadTransferee) return []byte(j), nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index c6d79c1bdd2..92fa0041bbe 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -141,20 +141,20 @@ "revisionTime": "2016-10-10T02:54:55Z" }, { - "checksumSHA1": "h1nLibY0IliypSG0cwbXpSpcsMA=", + "checksumSHA1": "4VUg2Be1lkd0wm8iVTkoMTa58Ow=", "path": "github.com/coreos/etcd/raft", - "revision": "c9504f61fc7f29b0ad30bf8bab02d9e1b600e962", - "revisionTime": "2018-06-15T16:40:41Z", - "version": "v3.2.23", - "versionExact": "v3.2.23" + "revision": "27fc7e2296f506182f58ce846e48f36b34fe6842", + "revisionTime": "2018-10-10T17:17:54Z", + "version": "v3.3.10", + "versionExact": "v3.3.10" }, { - "checksumSHA1": "9UUP0nQdKxvJZOFg7e8FP4gzzgA=", + "checksumSHA1": "cwEnAGl7uzwDepjDZcIocMVEVEE=", "path": "github.com/coreos/etcd/raft/raftpb", - "revision": "c9504f61fc7f29b0ad30bf8bab02d9e1b600e962", - "revisionTime": "2018-06-15T16:40:41Z", - "version": "v3.2.23", - "versionExact": "v3.2.23" + "revision": "27fc7e2296f506182f58ce846e48f36b34fe6842", + "revisionTime": "2018-10-10T17:17:54Z", + "version": "v3.3.10", + "versionExact": "v3.3.10" }, { "checksumSHA1": "Lf3uUXTkKK5DJ37BxQvxO1Fq+K8=", @@ -307,10 +307,10 @@ "revisionTime": "2016-01-25T20:49:56Z" }, { - "checksumSHA1": "GaJLoEuMGnP5ofXvuweAI4wx06U=", + "checksumSHA1": "MVW67aGwWsD1pGmL04HlQlaOfKE=", "path": "github.com/golang/protobuf/proto", - "revision": "d3de96c4c28ef8af3aa1a892fc481e0f103c01ff", - "revisionTime": "2018-10-25T22:50:59Z" + "revision": "882cf97a83ad205fd22af574246a3bc647d7a7d2", + "revisionTime": "2018-11-20T00:18:57Z" }, { "checksumSHA1": "z4copNgeTN77OymdDKqLaIK/vSI=", diff --git a/worker/config.go b/worker/config.go index 773a070d5d1..81e8e375a18 100644 --- a/worker/config.go +++ b/worker/config.go @@ -24,6 +24,7 @@ type IPRange struct { type Options struct { ExportPath string NumPendingProposals int + // TODO: Get rid of this here. Tracing float64 MyAddr string ZeroAddr string diff --git a/worker/groups.go b/worker/groups.go index 7937aae18f2..6aaf2c3fa41 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc" "golang.org/x/net/context" - "golang.org/x/net/trace" "github.com/dgraph-io/badger" "github.com/dgraph-io/badger/y" @@ -92,25 +91,21 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { glog.Infof("Current Raft Id: %d\n", Config.RaftId) // Successfully connect with dgraphzero, before doing anything else. - p := conn.Get().Connect(Config.ZeroAddr) - // Connect with dgraphzero and figure out what group we should belong to. - zc := pb.NewZeroClient(p.Get()) - var connState *pb.ConnectionState + // Connect with Zero leader and figure out what group we should belong to. m := &pb.Member{Id: Config.RaftId, Addr: Config.MyAddr} - delay := 50 * time.Millisecond - maxHalfDelay := 3 * time.Second + var connState *pb.ConnectionState var err error for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + pl := gr.connToZeroLeader(Config.ZeroAddr) + if pl == nil { + continue + } + zc := pb.NewZeroClient(pl.Get()) connState, err = zc.Connect(gr.ctx, m) if err == nil || grpc.ErrorDesc(err) == x.ErrReuseRemovedId.Error() { break } - glog.Errorf("Error while connecting with group zero: %v", err) - time.Sleep(delay) - if delay <= maxHalfDelay { - delay *= 2 - } } x.CheckfNoTrace(err) if connState.GetMember() == nil || connState.GetState() == nil { @@ -121,8 +116,6 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { // This timestamp would be used for reading during snapshot after bulk load. // The stream is async, we need this information before we start or else replica might // not get any data. - // TODO: Do we really need this? - // posting.Oracle().SetMaxPending(connState.MaxPending) gr.applyState(connState.GetState()) gid := gr.groupId() @@ -352,7 +345,7 @@ func (g *groupi) Tablet(key string) *pb.Tablet { // We don't know about this tablet. // Check with dgraphzero if we can serve it. - pl := g.AnyServer(0) + pl := g.Leader(0) if pl == nil { return nil } @@ -492,7 +485,53 @@ func (g *groupi) triggerMembershipSync() { } } +const connBaseDelay = 100 * time.Millisecond + +func (g *groupi) connToZeroLeader(addr string) *conn.Pool { + pl := g.Leader(0) + if pl != nil { + return pl + } + glog.V(1).Infof("No healthy Zero leader found. Trying to find a Zero leader...") + + // No leader found. Let's get the latest membership state from Zero. + delay := connBaseDelay + maxHalfDelay := time.Second + for { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + time.Sleep(delay) + if delay <= maxHalfDelay { + delay *= 2 + } + var pl *conn.Pool + if len(addr) > 0 { + pl = conn.Get().Connect(addr) + } + if pl == nil { + pl = g.AnyServer(0) + if pl == nil { + glog.V(1).Infof("No healthy Zero server found. Retrying...") + continue + } + } + zc := pb.NewZeroClient(pl.Get()) + connState, err := zc.Connect(gr.ctx, &pb.Member{ClusterInfoOnly: true}) + if err != nil || connState == nil { + glog.V(1).Infof("While retrieving Zero leader info. Error: %v. Retrying...", err) + continue + } + for _, mz := range connState.State.GetZeros() { + if mz.Leader { + pl := conn.Get().Connect(mz.GetAddr()) + return pl + } + } + glog.V(1).Infof("Unable to connect to a healthy Zero leader. Retrying...") + } +} + // TODO: This function needs to be refactored into smaller functions. It gets hard to reason about. +// TODO: The updates have to be sent to Zero leader. But, the membership update receives can be from +// any Zero server. Let's break that up into two different endpoints. func (g *groupi) periodicMembershipUpdate() { defer g.closer.Done() // CLOSER:1 @@ -514,14 +553,14 @@ START: default: } - pl := g.AnyServer(0) + pl := g.connToZeroLeader("") // We should always have some connection to dgraphzero. if pl == nil { glog.Warningln("Membership update: No Zero server known.") time.Sleep(time.Second) goto START } - glog.Infof("Got address of a Zero server: %s", pl.Addr) + glog.Infof("Got address of a Zero leader: %s", pl.Addr) c := pb.NewZeroClient(pl.Get()) ctx, cancel := context.WithCancel(context.Background()) @@ -593,6 +632,9 @@ OUTER: break OUTER } case <-slowTicker.C: + // TODO: Zero should have two different RPCs. One for receiving updates, and one for + // sending updates. + // dgraphzero just adds to the map so check that no data is present for the tablet // before we remove it to avoid the race condition where a tablet is added recently // and mutation has not been persisted to disk. @@ -742,15 +784,12 @@ func (g *groupi) processOracleDeltaStream() { defer ticker.Stop() blockingReceiveAndPropose := func() { - elog := trace.NewEventLog("Dgraph", "ProcessOracleStream") - defer elog.Finish() glog.Infof("Leader idx=%d of group=%d is connecting to Zero for txn updates\n", g.Node.Id, g.groupId()) pl := g.Leader(0) if pl == nil { glog.Warningln("Oracle delta stream: No Zero leader known.") - elog.Errorf("Dgraph zero leader address unknown") time.Sleep(time.Second) return } @@ -769,7 +808,6 @@ func (g *groupi) processOracleDeltaStream() { stream, err := c.Oracle(ctx, &api.Payload{}) if err != nil { glog.Errorf("Error while calling Oracle %v\n", err) - elog.Errorf("Error while calling Oracle %v", err) time.Sleep(time.Second) return } @@ -850,9 +888,16 @@ func (g *groupi) processOracleDeltaStream() { sort.Slice(delta.Txns, func(i, j int) bool { return delta.Txns[i].CommitTs < delta.Txns[j].CommitTs }) - elog.Printf("Batched %d updates. Proposing Delta: %v.", batch, delta) - if glog.V(3) { - glog.Infof("Batched %d updates. Proposing Delta: %v.", batch, delta) + if glog.V(2) { + glog.Infof("Batched %d updates. Max Assigned: %d. Proposing Deltas:", + batch, delta.MaxAssigned) + for _, txn := range delta.Txns { + if txn.CommitTs == 0 { + glog.Infof("Aborted: %d", txn.StartTs) + } else { + glog.Infof("Committed: %d -> %d", txn.StartTs, txn.CommitTs) + } + } } for { // Block forever trying to propose this. diff --git a/worker/task.go b/worker/task.go index cdaf3d343e4..f95bff251ab 100644 --- a/worker/task.go +++ b/worker/task.go @@ -592,13 +592,14 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti // processTask processes the query, accumulates and returns the result. func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) { span := otrace.FromContext(ctx) - span.Annotate(nil, "Waiting for startTs") + span.Annotatef(nil, "Waiting for startTs: %d", q.ReadTs) if err := posting.Oracle().WaitForTs(ctx, q.ReadTs); err != nil { return &emptyResult, err } if span != nil { - span.Annotatef(nil, "Done waiting for maxPending to catch up for Attr %q, readTs: %d\n", - q.Attr, q.ReadTs) + maxAssigned := posting.Oracle().MaxAssigned() + span.Annotatef(nil, "Done waiting for maxAssigned. Attr: %q ReadTs: %d Max: %d", + q.Attr, q.ReadTs, maxAssigned) } // If a group stops serving tablet and it gets partitioned away from group zero, then it