From 02ac08a70155b277f7eb9d23fe3e3fab9a0fb46f Mon Sep 17 00:00:00 2001 From: srfrog Date: Wed, 23 Jan 2019 16:23:57 -0700 Subject: [PATCH 1/7] added HTTP command /shutdown --- dgraph/cmd/zero/http.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dgraph/cmd/zero/http.go b/dgraph/cmd/zero/http.go index 40164d7c6a9..8ad944b4f73 100644 --- a/dgraph/cmd/zero/http.go +++ b/dgraph/cmd/zero/http.go @@ -224,6 +224,21 @@ func (st *state) getState(w http.ResponseWriter, r *http.Request) { } } +func (st *state) shutdown(w http.ResponseWriter, r *http.Request) { + x.AddCorsHeaders(w) + if r.Method == "OPTIONS" { + return + } + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusBadRequest) + x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method") + return + } + + st.zero.shutDownCh <- struct{}{} + w.Write([]byte("Server is shutting down")) +} + func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) { srv := &http.Server{ ReadTimeout: 10 * time.Second, From 861bc99ef7c1f7f822d33512b864e50a3ccb1d1a Mon Sep 17 00:00:00 2001 From: srfrog Date: Wed, 23 Jan 2019 16:24:45 -0700 Subject: [PATCH 2/7] handle http requests to shutdown, changed signal handling for clean service shutdown --- dgraph/cmd/zero/run.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 544df4400ad..356024c2f48 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -239,6 +239,7 @@ func run() { http.HandleFunc("/removeNode", st.removeNode) http.HandleFunc("/moveTablet", st.moveTablet) http.HandleFunc("/assign", st.assign) + http.HandleFunc("/shutdown", st.shutdown) zpages.Handle(http.DefaultServeMux, "/z") // This must be here. It does not work if placed before Grpc init. @@ -251,18 +252,34 @@ func run() { sdCh := make(chan os.Signal, 1) signal.Notify(sdCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + // handle signals + go func() { + for { + select { + case sig, ok := <-sdCh: + if !ok { + return + } + glog.Infof("--- Received %s signal", sig) + signal.Stop(sdCh) + st.zero.shutDownCh <- struct{}{} + } + } + }() + go func() { defer wg.Done() - <-sdCh - glog.Infof("Shutting down...") + <-st.zero.shutDownCh + glog.Infoln("Shutting down...") + close(st.zero.shutDownCh) + close(sdCh) // Close doesn't close already opened connections. httpListener.Close() grpcListener.Close() - close(st.zero.shutDownCh) st.node.trySnapshot(0) }() - glog.Infof("Running Dgraph Zero...") + glog.Infoln("Running Dgraph Zero...") wg.Wait() - glog.Infof("All done.") + glog.Infoln("All done.") } From 9d028ba3d08b23d575f02bfabf2bd36c15b462b3 Mon Sep 17 00:00:00 2001 From: srfrog Date: Wed, 23 Jan 2019 16:25:44 -0700 Subject: [PATCH 3/7] removed unused global var --- dgraph/cmd/zero/assign.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dgraph/cmd/zero/assign.go b/dgraph/cmd/zero/assign.go index 08acd79b415..7e0d66784d3 100644 --- a/dgraph/cmd/zero/assign.go +++ b/dgraph/cmd/zero/assign.go @@ -26,10 +26,7 @@ import ( "github.com/golang/glog" ) -var ( - emptyNum pb.Num - emptyAssignedIds pb.AssignedIds -) +var emptyAssignedIds pb.AssignedIds const ( leaseBandwidth = uint64(10000) From 0ab18590d2aeb97c63b590454906f0320c5d9811 Mon Sep 17 00:00:00 2001 From: srfrog Date: Wed, 23 Jan 2019 16:31:03 -0700 Subject: [PATCH 4/7] minor cosmetic tweaks --- dgraph/cmd/zero/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dgraph/cmd/zero/http.go b/dgraph/cmd/zero/http.go index 8ad944b4f73..65633a453e7 100644 --- a/dgraph/cmd/zero/http.go +++ b/dgraph/cmd/zero/http.go @@ -236,7 +236,7 @@ func (st *state) shutdown(w http.ResponseWriter, r *http.Request) { } st.zero.shutDownCh <- struct{}{} - w.Write([]byte("Server is shutting down")) + w.Write([]byte("Server is shutting down...\n")) } func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) { From 0d75fcc29179afcc16e938303b00db4d2df5b71d Mon Sep 17 00:00:00 2001 From: srfrog Date: Thu, 24 Jan 2019 20:28:11 -0700 Subject: [PATCH 5/7] switched to y.Closer --- dgraph/cmd/zero/http.go | 6 +++--- dgraph/cmd/zero/run.go | 21 ++++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/dgraph/cmd/zero/http.go b/dgraph/cmd/zero/http.go index 65633a453e7..c1398c00778 100644 --- a/dgraph/cmd/zero/http.go +++ b/dgraph/cmd/zero/http.go @@ -22,9 +22,9 @@ import ( "net" "net/http" "strconv" - "sync" "time" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/gogo/protobuf/jsonpb" @@ -239,7 +239,7 @@ func (st *state) shutdown(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Server is shutting down...\n")) } -func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) { +func (st *state) serveHTTP(l net.Listener, closer *y.Closer) { srv := &http.Server{ ReadTimeout: 10 * time.Second, WriteTimeout: 600 * time.Second, @@ -247,7 +247,7 @@ func (st *state) serveHTTP(l net.Listener, wg *sync.WaitGroup) { } go func() { - defer wg.Done() + defer closer.Done() err := srv.Serve(l) glog.Errorf("Stopped taking more http(s) requests. Err: %v", err) ctx, cancel := context.WithTimeout(context.Background(), 630*time.Second) diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 356024c2f48..80048e86de9 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -23,7 +23,6 @@ import ( "net/http" "os" "os/signal" - "sync" "syscall" "time" @@ -36,6 +35,7 @@ import ( "google.golang.org/grpc" "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" @@ -105,7 +105,7 @@ type state struct { zero *Server } -func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.DiskStorage) { +func (st *state) serveGRPC(l net.Listener, closer *y.Closer, store *raftwal.DiskStorage) { if collector := Zero.Conf.GetString("jaeger.collector"); len(collector) > 0 { // Port details: https://www.jaegertracing.io/docs/getting-started/ // Default collectorEndpointURI := "http://localhost:14268" @@ -148,7 +148,7 @@ func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.Di pb.RegisterRaftServer(s, st.rs) go func() { - defer wg.Done() + defer closer.Done() err := s.Serve(l) glog.Infof("gRpc server stopped : %v", err) st.node.stop <- struct{}{} @@ -228,12 +228,12 @@ func run() { defer kv.Close() store := raftwal.Init(kv, opts.nodeId, 0) - var wg sync.WaitGroup - wg.Add(3) + closer := y.NewCloser(2) + // Initialize the servers. var st state - st.serveGRPC(grpcListener, &wg, store) - st.serveHTTP(httpListener, &wg) + st.serveGRPC(grpcListener, closer, store) + st.serveHTTP(httpListener, closer) http.HandleFunc("/state", st.getState) http.HandleFunc("/removeNode", st.removeNode) @@ -252,8 +252,11 @@ func run() { sdCh := make(chan os.Signal, 1) signal.Notify(sdCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + closer.AddRunning(2) + // handle signals go func() { + defer closer.Done() for { select { case sig, ok := <-sdCh: @@ -268,7 +271,7 @@ func run() { }() go func() { - defer wg.Done() + defer closer.Done() <-st.zero.shutDownCh glog.Infoln("Shutting down...") close(st.zero.shutDownCh) @@ -280,6 +283,6 @@ func run() { }() glog.Infoln("Running Dgraph Zero...") - wg.Wait() + closer.SignalAndWait() glog.Infoln("All done.") } From e45c4097d919ea5c437eef86333766bc04a39ad2 Mon Sep 17 00:00:00 2001 From: srfrog Date: Thu, 24 Jan 2019 20:48:21 -0700 Subject: [PATCH 6/7] simplified closer usage --- dgraph/cmd/zero/http.go | 7 +++---- dgraph/cmd/zero/oracle.go | 2 +- dgraph/cmd/zero/run.go | 25 ++++++++++--------------- dgraph/cmd/zero/zero.go | 9 +++++---- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/dgraph/cmd/zero/http.go b/dgraph/cmd/zero/http.go index c1398c00778..bae6c1b8f7f 100644 --- a/dgraph/cmd/zero/http.go +++ b/dgraph/cmd/zero/http.go @@ -24,7 +24,6 @@ import ( "strconv" "time" - "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/gogo/protobuf/jsonpb" @@ -235,11 +234,11 @@ func (st *state) shutdown(w http.ResponseWriter, r *http.Request) { return } - st.zero.shutDownCh <- struct{}{} + st.zero.closer.Signal() w.Write([]byte("Server is shutting down...\n")) } -func (st *state) serveHTTP(l net.Listener, closer *y.Closer) { +func (st *state) serveHTTP(l net.Listener) { srv := &http.Server{ ReadTimeout: 10 * time.Second, WriteTimeout: 600 * time.Second, @@ -247,7 +246,7 @@ func (st *state) serveHTTP(l net.Listener, closer *y.Closer) { } go func() { - defer closer.Done() + defer st.zero.closer.Done() err := srv.Serve(l) glog.Errorf("Stopped taking more http(s) requests. Err: %v", err) ctx, cancel := context.WithTimeout(context.Background(), 630*time.Second) diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 047bef4493b..9e5c9dcf0d9 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -448,7 +448,7 @@ func (s *Server) Oracle(unused *api.Payload, server pb.Zero_OracleServer) error } case <-ctx.Done(): return ctx.Err() - case <-s.shutDownCh: + case <-s.closer.HasBeenClosed(): return errServerShutDown } } diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 80048e86de9..743e8764ac8 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -35,7 +35,6 @@ import ( "google.golang.org/grpc" "github.com/dgraph-io/badger" - "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" @@ -105,7 +104,7 @@ type state struct { zero *Server } -func (st *state) serveGRPC(l net.Listener, closer *y.Closer, store *raftwal.DiskStorage) { +func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { if collector := Zero.Conf.GetString("jaeger.collector"); len(collector) > 0 { // Port details: https://www.jaegertracing.io/docs/getting-started/ // Default collectorEndpointURI := "http://localhost:14268" @@ -148,7 +147,7 @@ func (st *state) serveGRPC(l net.Listener, closer *y.Closer, store *raftwal.Disk pb.RegisterRaftServer(s, st.rs) go func() { - defer closer.Done() + defer st.zero.closer.Done() err := s.Serve(l) glog.Infof("gRpc server stopped : %v", err) st.node.stop <- struct{}{} @@ -228,12 +227,10 @@ func run() { defer kv.Close() store := raftwal.Init(kv, opts.nodeId, 0) - closer := y.NewCloser(2) - // Initialize the servers. var st state - st.serveGRPC(grpcListener, closer, store) - st.serveHTTP(httpListener, closer) + st.serveGRPC(grpcListener, store) + st.serveHTTP(httpListener) http.HandleFunc("/state", st.getState) http.HandleFunc("/removeNode", st.removeNode) @@ -252,11 +249,8 @@ func run() { sdCh := make(chan os.Signal, 1) signal.Notify(sdCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - closer.AddRunning(2) - // handle signals go func() { - defer closer.Done() for { select { case sig, ok := <-sdCh: @@ -265,16 +259,17 @@ func run() { } glog.Infof("--- Received %s signal", sig) signal.Stop(sdCh) - st.zero.shutDownCh <- struct{}{} + st.zero.closer.Signal() } } }() + st.zero.closer.AddRunning(1) + go func() { - defer closer.Done() - <-st.zero.shutDownCh + defer st.zero.closer.Done() + <-st.zero.closer.HasBeenClosed() glog.Infoln("Shutting down...") - close(st.zero.shutDownCh) close(sdCh) // Close doesn't close already opened connections. httpListener.Close() @@ -283,6 +278,6 @@ func run() { }() glog.Infoln("Running Dgraph Zero...") - closer.SignalAndWait() + st.zero.closer.Wait() glog.Infoln("All done.") } diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index f7e465702e1..67a0fe13c05 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -25,6 +25,7 @@ import ( otrace "go.opencensus.io/trace" "golang.org/x/net/context" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" @@ -56,8 +57,8 @@ type Server struct { // groupMap map[uint32]*Group nextGroup uint32 leaderChangeCh chan struct{} - shutDownCh chan struct{} // Used to tell stream to close. - connectLock sync.Mutex // Used to serialize connect requests from servers. + closer *y.Closer // Used to tell stream to close. + connectLock sync.Mutex // Used to serialize connect requests from servers. blockCommitsOn map[string]struct{} } @@ -76,7 +77,7 @@ func (s *Server) Init() { s.nextTxnTs = 1 s.nextGroup = 1 s.leaderChangeCh = make(chan struct{}, 1) - s.shutDownCh = make(chan struct{}, 1) + s.closer = y.NewCloser(2) // grpc and http s.blockCommitsOn = make(map[string]struct{}) go s.rebalanceTablets() } @@ -614,7 +615,7 @@ func (s *Server) StreamMembership(_ *api.Payload, stream pb.Zero_StreamMembershi } case <-ctx.Done(): return ctx.Err() - case <-s.shutDownCh: + case <-s.closer.HasBeenClosed(): return errServerShutDown } } From e094dbb99ea72f584ad18d6b421c45cea8d22697 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Fri, 25 Jan 2019 17:08:45 -0800 Subject: [PATCH 7/7] Remove /shutdown endpoint, and use closer in Raft node as well. --- dgraph/cmd/zero/http.go | 15 --------------- dgraph/cmd/zero/raft.go | 10 +++++++--- dgraph/cmd/zero/run.go | 12 ++++++++---- 3 files changed, 15 insertions(+), 22 deletions(-) diff --git a/dgraph/cmd/zero/http.go b/dgraph/cmd/zero/http.go index bae6c1b8f7f..e4eddf9be32 100644 --- a/dgraph/cmd/zero/http.go +++ b/dgraph/cmd/zero/http.go @@ -223,21 +223,6 @@ func (st *state) getState(w http.ResponseWriter, r *http.Request) { } } -func (st *state) shutdown(w http.ResponseWriter, r *http.Request) { - x.AddCorsHeaders(w) - if r.Method == "OPTIONS" { - return - } - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusBadRequest) - x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method") - return - } - - st.zero.closer.Signal() - w.Write([]byte("Server is shutting down...\n")) -} - func (st *state) serveHTTP(l net.Listener) { srv := &http.Server{ ReadTimeout: 10 * time.Second, diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index f687f19f7d3..525db6439bc 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -43,7 +43,7 @@ type node struct { ctx context.Context reads map[uint64]chan uint64 subscribers map[uint32]chan struct{} - stop chan struct{} // to send stop signal to Run + closer *y.Closer // to stop Run. // The last timestamp when this Zero was able to reach quorum. mu sync.RWMutex @@ -561,7 +561,11 @@ func (n *node) Run() { // it in goroutine readStateCh := make(chan raft.ReadState, 10) closer := y.NewCloser(4) - defer closer.SignalAndWait() + defer func() { + closer.SignalAndWait() + n.closer.Done() + glog.Infof("Zero Node.Run finished.") + }() go n.snapshotPeriodically(closer) go n.updateZeroMembershipPeriodically(closer) @@ -572,7 +576,7 @@ func (n *node) Run() { for { select { - case <-n.stop: + case <-n.closer.HasBeenClosed(): n.Raft().Stop() return case <-ticker.C: diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 743e8764ac8..9fe5a1eea9b 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc" "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/raftwal" @@ -138,7 +139,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { m.Cfg.DisableProposalForwarding = true st.rs = &conn.RaftServer{Node: m} - st.node = &node{Node: m, ctx: context.Background(), stop: make(chan struct{})} + st.node = &node{Node: m, ctx: context.Background(), closer: y.NewCloser(1)} st.zero = &Server{NumReplicas: opts.numReplicas, Node: st.node} st.zero.Init() st.node.server = st.zero @@ -149,8 +150,7 @@ func (st *state) serveGRPC(l net.Listener, store *raftwal.DiskStorage) { go func() { defer st.zero.closer.Done() err := s.Serve(l) - glog.Infof("gRpc server stopped : %v", err) - st.node.stop <- struct{}{} + glog.Infof("gRPC server stopped : %v", err) // Attempt graceful stop (waits for pending RPCs), but force a stop if // it doesn't happen in a reasonable amount of time. @@ -236,7 +236,6 @@ func run() { http.HandleFunc("/removeNode", st.removeNode) http.HandleFunc("/moveTablet", st.moveTablet) http.HandleFunc("/assign", st.assign) - http.HandleFunc("/shutdown", st.shutdown) zpages.Handle(http.DefaultServeMux, "/z") // This must be here. It does not work if placed before Grpc init. @@ -272,7 +271,12 @@ func run() { glog.Infoln("Shutting down...") close(sdCh) // Close doesn't close already opened connections. + + // Stop all HTTP requests. httpListener.Close() + // Stop Raft. + st.node.closer.SignalAndWait() + // Stop all internal requests. grpcListener.Close() st.node.trySnapshot(0) }()