diff --git a/dgraph/cmd/zero/assign.go b/dgraph/cmd/zero/assign.go index 08acd79b415..7e0d66784d3 100644 --- a/dgraph/cmd/zero/assign.go +++ b/dgraph/cmd/zero/assign.go @@ -26,10 +26,7 @@ import ( "github.com/golang/glog" ) -var ( - emptyNum pb.Num - emptyAssignedIds pb.AssignedIds -) +var emptyAssignedIds pb.AssignedIds const ( leaseBandwidth = uint64(10000) diff --git a/dgraph/cmd/zero/http.go b/dgraph/cmd/zero/http.go index 40164d7c6a9..e4eddf9be32 100644 --- a/dgraph/cmd/zero/http.go +++ b/dgraph/cmd/zero/http.go @@ -22,7 +22,6 @@ import ( "net" "net/http" "strconv" - "sync" "time" "github.com/dgraph-io/dgraph/protos/pb" @@ -224,7 +223,7 @@ func (st *state) getState(w http.ResponseWriter, r *http.Request) { } } -func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) { +func (st *state) serveHTTP(l net.Listener) { srv := &http.Server{ ReadTimeout: 10 * time.Second, WriteTimeout: 600 * time.Second, @@ -232,7 +231,7 @@ func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) { } go func() { - defer wg.Done() + defer st.zero.closer.Done() err := srv.Serve(l) glog.Errorf("Stopped taking more http(s) requests. Err: %v", err) ctx, cancel := context.WithTimeout(context.Background(), 630*time.Second) diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 047bef4493b..9e5c9dcf0d9 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -448,7 +448,7 @@ func (s *Server) Oracle(unused *api.Payload, server pb.Zero_OracleServer) error } case <-ctx.Done(): return ctx.Err() - case <-s.shutDownCh: + case <-s.closer.HasBeenClosed(): return errServerShutDown } } diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index f687f19f7d3..525db6439bc 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -43,7 +43,7 @@ type node struct { ctx context.Context reads map[uint64]chan uint64 subscribers map[uint32]chan struct{} - stop chan struct{} // to send stop signal to Run + closer *y.Closer // to stop Run. // The last timestamp when this Zero was able to reach quorum. mu sync.RWMutex @@ -561,7 +561,11 @@ func (n *node) Run() { // it in goroutine readStateCh := make(chan raft.ReadState, 10) closer := y.NewCloser(4) - defer closer.SignalAndWait() + defer func() { + closer.SignalAndWait() + n.closer.Done() + glog.Infof("Zero Node.Run finished.") + }() go n.snapshotPeriodically(closer) go n.updateZeroMembershipPeriodically(closer) @@ -572,7 +576,7 @@ func (n *node) Run() { for { select { - case <-n.stop: + case <-n.closer.HasBeenClosed(): n.Raft().Stop() return case <-ticker.C: diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 544df4400ad..9fe5a1eea9b 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -23,7 +23,6 @@ import ( "net/http" "os" "os/signal" - "sync" "syscall" "time" @@ -36,6 +35,7 @@ import ( "google.golang.org/grpc" "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" @@ -105,7 +105,7 @@ type state struct { zero *Server } -func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.DiskStorage) { +func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { if collector := Zero.Conf.GetString("jaeger.collector"); len(collector) > 0 { // Port details: https://www.jaegertracing.io/docs/getting-started/ // Default collectorEndpointURI := "http://localhost:14268" @@ -139,7 +139,7 @@ func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.Di m.Cfg.DisableProposalForwarding = true st.rs = &conn.RaftServer{Node: m} - st.node = &node{Node: m, ctx: context.Background(), stop: make(chan struct{})} + st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)} st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node} st.zero.Init() st.node.server = st.zero @@ -148,10 +148,9 @@ func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.Di pb.RegisterRaftServer(s, st.rs) go func() { - defer wg.Done() + defer st.zero.closer.Done() err := s.Serve(l) - glog.Infof("gRpc server stopped : %v", err) - st.node.stop <- struct{}{} + glog.Infof("gRPC server stopped : %v", err) // Attempt graceful stop (waits for pending RPCs), but force a stop if // it doesn't happen in a reasonable amount of time. @@ -228,12 +227,10 @@ func run() { defer kv.Close() store := raftwal.Init(kv, opts.nodeId, 0) - var wg sync.WaitGroup - wg.Add(3) // Initialize the servers. var st state - st.serveGRPC(grpcListener, &wg, store) - st.serveHTTP(httpListener, &wg) + st.serveGRPC(grpcListener, store) + st.serveHTTP(httpListener) http.HandleFunc("/state", st.getState) http.HandleFunc("/removeNode", st.removeNode) @@ -251,18 +248,40 @@ func run() { sdCh := make(chan os.Signal, 1) signal.Notify(sdCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + // handle signals go func() { - defer wg.Done() - <-sdCh - glog.Infof("Shutting down...") + for { + select { + case sig, ok := <-sdCh: + if !ok { + return + } + glog.Infof("--- Received %s signal", sig) + signal.Stop(sdCh) + st.zero.closer.Signal() + } + } + }() + + st.zero.closer.AddRunning(1) + + go func() { + defer st.zero.closer.Done() + <-st.zero.closer.HasBeenClosed() + glog.Infoln("Shutting down...") + close(sdCh) // Close doesn't close already opened connections. + + // Stop all HTTP requests. httpListener.Close() + // Stop Raft. + st.node.closer.SignalAndWait() + // Stop all internal requests. grpcListener.Close() - close(st.zero.shutDownCh) st.node.trySnapshot(0) }() - glog.Infof("Running Dgraph Zero...") - wg.Wait() - glog.Infof("All done.") + glog.Infoln("Running Dgraph Zero...") + st.zero.closer.Wait() + glog.Infoln("All done.") } diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index f7e465702e1..67a0fe13c05 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -25,6 +25,7 @@ import ( otrace "go.opencensus.io/trace" "golang.org/x/net/context" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" @@ -56,8 +57,8 @@ type Server struct { // groupMap map[uint32]*Group nextGroup uint32 leaderChangeCh chan struct{} - shutDownCh chan struct{} // Used to tell stream to close. - connectLock sync.Mutex // Used to serialize connect requests from servers. + closer *y.Closer // Used to tell stream to close. + connectLock sync.Mutex // Used to serialize connect requests from servers. blockCommitsOn map[string]struct{} } @@ -76,7 +77,7 @@ func (s *Server) Init() { s.nextTxnTs = 1 s.nextGroup = 1 s.leaderChangeCh = make(chan struct{}, 1) - s.shutDownCh = make(chan struct{}, 1) + s.closer = y.NewCloser(2) // grpc and http s.blockCommitsOn = make(map[string]struct{}) go s.rebalanceTablets() } @@ -614,7 +615,7 @@ func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershi } case <-ctx.Done(): return ctx.Err() - case <-s.shutDownCh: + case <-s.closer.HasBeenClosed(): return errServerShutDown } }