diff --git a/config.go b/config.go index b67f8f6e48..e23da62485 100644 --- a/config.go +++ b/config.go @@ -155,6 +155,9 @@ type config struct { DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."` NoExistsAddrIndex bool `long:"noexistsaddrindex" description:"Disable the exists address index, which tracks whether or not an address has even been used."` DropExistsAddrIndex bool `long:"dropexistsaddrindex" description:"Deletes the exists address index from the database on start up and then exits."` + PipeRx uint `long:"piperx" description:"File descriptor of read end pipe to enable parent -> child process communication"` + PipeTx uint `long:"pipetx" description:"File descriptor of write end pipe to enable parent <- child process communication"` + LifetimeEvents bool `long:"lifetimeevents" description:"Send lifetime notifications over the TX pipe"` onionlookup func(string) ([]net.IP, error) lookup func(string) ([]net.IP, error) oniondial func(string, string) (net.Conn, error) diff --git a/dcrd.go b/dcrd.go index 1508c8b911..26ae539aa7 100644 --- a/dcrd.go +++ b/dcrd.go @@ -17,13 +17,11 @@ import ( "time" "github.com/decred/dcrd/blockchain/indexers" + "github.com/decred/dcrd/blockchain/stake" "github.com/decred/dcrd/limits" ) -var ( - cfg *config - shutdownChannel = make(chan struct{}) -) +var cfg *config // winServiceMain is only invoked on Windows. It detects when dcrd is running // as a service and reacts accordingly. @@ -44,6 +42,9 @@ func dcrdMain(serverChan chan<- *server) error { cfg = tcfg defer backendLog.Flush() + interrupted := interruptListener() + defer dcrdLog.Info("Shutdown complete") + // Show version at startup. dcrdLog.Infof("Version %s", version()) // Show dcrd home dir location @@ -92,38 +93,50 @@ func dcrdMain(serverChan chan<- *server) error { }() } + var lifetimeNotifier lifetimeEventServer + if cfg.LifetimeEvents { + lifetimeNotifier = newLifetimeEventServer(outgoingPipeMessages) + } + + if cfg.PipeRx != 0 { + go serviceControlPipeRx(uintptr(cfg.PipeRx)) + } + if cfg.PipeTx != 0 { + go serviceControlPipeTx(uintptr(cfg.PipeTx)) + } else { + go drainOutgoingPipeMessages() + } + + if interruptRequested(interrupted) { + return nil + } + // Perform upgrades to dcrd as new versions require it. if err := doUpgrades(); err != nil { dcrdLog.Errorf("%v", err) return err } - // Load the block database. - db, err := loadBlockDB() - if err != nil { - dcrdLog.Errorf("%v", err) - return err + if interruptRequested(interrupted) { + return nil } - defer db.Close() - tmdb, err := loadTicketDB(db, activeNetParams.Params) + // Load the block database. + lifetimeNotifier.notifyStartupEvent(lifetimeEventDBOpen) + db, err := loadBlockDB() if err != nil { dcrdLog.Errorf("%v", err) return err } defer func() { - err := tmdb.Store(cfg.DataDir, "ticketdb.gob") - if err != nil { - dcrdLog.Errorf("Failed to store ticket database: %v", err.Error()) - } - }() - defer tmdb.Close() - - // Ensure the databases are sync'd and closed on Ctrl+C. - addInterruptHandler(func() { + lifetimeNotifier.notifyShutdownEvent(lifetimeEventDBOpen) dcrdLog.Infof("Gracefully shutting down the database...") db.Close() - }) + }() + + if interruptRequested(interrupted) { + return nil + } // Drop indexes and exit if requested. // @@ -154,7 +167,41 @@ func dcrdMain(serverChan chan<- *server) error { return nil } + // The ticket "DB" takes ages to load and serialize back out to a file. + // Load it asynchronously and if the process is interrupted during the + // load, discard the result since no cleanup is necessary. + lifetimeNotifier.notifyStartupEvent(lifetimeEventTicketDB) + type ticketDBResult struct { + ticketDB *stake.TicketDB + err error + } + ticketDBResultChan := make(chan ticketDBResult) + go func() { + tmdb, err := loadTicketDB(db, activeNetParams.Params) + ticketDBResultChan <- ticketDBResult{tmdb, err} + }() + var tmdb *stake.TicketDB + select { + case <-interrupted: + return nil + case r := <-ticketDBResultChan: + if r.err != nil { + dcrdLog.Errorf("%v", err) + return err + } + tmdb = r.ticketDB + } + defer func() { + lifetimeNotifier.notifyShutdownEvent(lifetimeEventTicketDB) + tmdb.Close() + err := tmdb.Store(cfg.DataDir, "ticketdb.gob") + if err != nil { + dcrdLog.Errorf("Failed to store ticket database: %v", err.Error()) + } + }() + // Create server and start it. + lifetimeNotifier.notifyStartupEvent(lifetimeEventP2PServer) server, err := newServer(cfg.Listeners, db, tmdb, activeNetParams.Params) if err != nil { // TODO(oga) this logging could do with some beautifying. @@ -162,32 +209,28 @@ func dcrdMain(serverChan chan<- *server) error { cfg.Listeners, err) return err } - addInterruptHandler(func() { + defer func() { + lifetimeNotifier.notifyShutdownEvent(lifetimeEventP2PServer) dcrdLog.Infof("Gracefully shutting down the server...") server.Stop() server.WaitForShutdown() - }) + srvrLog.Infof("Server shutdown complete") + }() + server.Start() if serverChan != nil { serverChan <- server } - // Monitor for graceful server shutdown and signal the main goroutine - // when done. This is done in a separate goroutine rather than waiting - // directly so the main goroutine can be signaled for shutdown by either - // a graceful shutdown or from the main interrupt handler. This is - // necessary since the main goroutine must be kept running long enough - // for the interrupt handler goroutine to finish. - go func() { - server.WaitForShutdown() - srvrLog.Infof("Server shutdown complete") - shutdownChannel <- struct{}{} - }() + if interruptRequested(interrupted) { + return nil + } + + lifetimeNotifier.notifyStartupComplete() - // Wait for shutdown signal from either a graceful server stop or from - // the interrupt handler. - <-shutdownChannel - dcrdLog.Info("Shutdown complete") + // Wait until the interrupt signal is received from an OS signal or + // shutdown is requested through the RPC server. + <-interrupted return nil } diff --git a/ipc.go b/ipc.go new file mode 100644 index 0000000000..dd39365e69 --- /dev/null +++ b/ipc.go @@ -0,0 +1,207 @@ +// Copyright (c) 2016 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package main + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "os" +) + +// Messages sent over a pipe are encoded using a simple binary message format: +// +// - Message type length (1 byte) +// - Message type string (encoded as UTF8, no longer than 255 bytes) +// - Message payload length (4 bytes, little endian) +// - Message payload bytes (no longer than 2^32 - 1 bytes) +type pipeMessage interface { + Type() string + PayloadSize() uint32 + WritePayload(w io.Writer) error +} + +var outgoingPipeMessages = make(chan pipeMessage) + +// serviceControlPipeRx reads from the file descriptor fd of a read end pipe. +// This is intended to be used as a simple control mechanism for parent +// processes to communicate with and and manage the lifetime of a dcrd child +// process using a unidirectional pipe (on Windows, this is an anonymous pipe, +// not a named pipe). +// +// When the pipe is closed or any other errors occur reading the control +// message, shutdown begins. This prevents dcrd from continuing to run +// unsupervised after the parent process closes unexpectedly. +// +// No control messages are currently defined and the only use for the pipe is to +// start clean shutdown when the pipe is closed. Control messages that follow +// the pipe message format can be added later as needed. +func serviceControlPipeRx(fd uintptr) { + pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd)) + r := bufio.NewReader(pipe) + for { + _, err := r.Discard(1024) + if err == io.EOF { + err = nil + break + } + if err != nil { + dcrdLog.Errorf("Failed to read from pipe: %v", err) + break + } + } + + select { + case shutdownRequestChannel <- struct{}{}: + default: + } +} + +// serviceControlPipeTx sends pipe messages to the file descriptor fd of a write +// end pipe. This is intended to be a simple response and notification system +// for a child dcrd process to communicate with a parent process without the +// need to go through the RPC server. +// +// See the comment on the pipeMessage interface for the binary encoding of a +// pipe message. +func serviceControlPipeTx(fd uintptr) { + defer drainOutgoingPipeMessages() + + pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd)) + w := bufio.NewWriter(pipe) + headerBuffer := make([]byte, 0, 1+255+4) // capped to max header size + var err error + for m := range outgoingPipeMessages { + mtype := m.Type() + psize := m.PayloadSize() + + headerBuffer = append(headerBuffer, byte(len(mtype))) + headerBuffer = append(headerBuffer, mtype...) + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, psize) + headerBuffer = append(headerBuffer, buf...) + + _, err = w.Write(headerBuffer) + if err != nil { + break + } + + err = m.WritePayload(w) + if err != nil { + break + } + + err = w.Flush() + if err != nil { + break + } + + headerBuffer = headerBuffer[:0] + } + + dcrdLog.Errorf("Failed to write to pipe: %v", err) +} + +func drainOutgoingPipeMessages() { + for range outgoingPipeMessages { + } +} + +// The lifetimeEvent describes a startup or shutdown event. The message type +// string is "lifetimeevent". +// +// The payload size is always 2 bytes long. The first byte describes whether a +// service or event is about to run or whether startup has completed. The +// second byte, when applicable, describes which event or service is about to +// start or stop. +// +// 0 : The startup event is about to run +// 1 : All startup tasks have completed +// 2 : The shutdown event is about to run +// +// Event IDs can take on the following values: +// +// 0: Database opening/closing +// 1: Ticket database opening/closing +// 2: Peer-to-peer server starting/stopping +// +// Note that not all subsystems are started/stopped or events run during the +// program's lifetime depending on what features are enabled through the config. +// +// As an example, the following messages may be sent during a typical execution: +// +// 0 0: The database is being opened +// 0 1: The ticket DB is being opened +// 0 2: The P2P server is starting +// 1 0: All startup tasks have completed +// 2 2: The P2P server is stopping +// 2 1: The ticket DB is being closed and written to disk +// 2 0: The database is being closed +type lifetimeEvent struct { + event lifetimeEventID + action lifetimeAction +} + +var _ pipeMessage = (*lifetimeEvent)(nil) + +type lifetimeEventID byte + +const ( + startupEvent lifetimeEventID = iota + startupComplete + shutdownEvent +) + +type lifetimeAction byte + +const ( + lifetimeEventDBOpen lifetimeAction = iota + lifetimeEventTicketDB + lifetimeEventP2PServer +) + +func (*lifetimeEvent) Type() string { return "lifetimeevent" } +func (e *lifetimeEvent) PayloadSize() uint32 { return 2 } +func (e *lifetimeEvent) WritePayload(w io.Writer) error { + _, err := w.Write([]byte{byte(e.event), byte(e.action)}) + return err +} + +type lifetimeEventServer chan<- pipeMessage + +func newLifetimeEventServer(outChan chan<- pipeMessage) lifetimeEventServer { + return lifetimeEventServer(outChan) +} + +func (s lifetimeEventServer) notifyStartupEvent(action lifetimeAction) { + if s == nil { + return + } + s <- &lifetimeEvent{ + event: startupEvent, + action: action, + } +} + +func (s lifetimeEventServer) notifyStartupComplete() { + if s == nil { + return + } + s <- &lifetimeEvent{ + event: startupComplete, + action: 0, + } +} + +func (s lifetimeEventServer) notifyShutdownEvent(action lifetimeAction) { + if s == nil { + return + } + s <- &lifetimeEvent{ + event: shutdownEvent, + action: action, + } +} diff --git a/rpcserver.go b/rpcserver.go index f7abef3790..da2e3675e8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -5064,7 +5064,10 @@ func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) // handleStop implements the stop command. func handleStop(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - s.server.Stop() + select { + case s.requestProcessShutdown <- struct{}{}: + default: + } return "dcrd stopping.", nil } @@ -5718,24 +5721,25 @@ func handleVerifyMessage(s *rpcServer, cmd interface{}, closeChan <-chan struct{ // rpcServer holds the items the rpc server may need to access (config, // shutdown, main server, etc.) type rpcServer struct { - started int32 - shutdown int32 - policy *mining.Policy - server *server - chain *blockchain.BlockChain - authsha [sha256.Size]byte - limitauthsha [sha256.Size]byte - ntfnMgr *wsNotificationManager - numClients int32 - statusLines map[int]string - statusLock sync.RWMutex - wg sync.WaitGroup - listeners []net.Listener - workState *workState - gbtWorkState *gbtWorkState - templatePool map[[merkleRootPairSize]byte]*workStateBlockInfo - helpCacher *helpCacher - quit chan int + started int32 + shutdown int32 + policy *mining.Policy + server *server + chain *blockchain.BlockChain + authsha [sha256.Size]byte + limitauthsha [sha256.Size]byte + ntfnMgr *wsNotificationManager + numClients int32 + statusLines map[int]string + statusLock sync.RWMutex + wg sync.WaitGroup + listeners []net.Listener + workState *workState + gbtWorkState *gbtWorkState + templatePool map[[merkleRootPairSize]byte]*workStateBlockInfo + helpCacher *helpCacher + requestProcessShutdown chan struct{} + quit chan int // coin supply caching values coinSupplyMtx sync.Mutex @@ -5824,6 +5828,13 @@ func (s *rpcServer) Stop() error { return nil } +// RequestedProcessShutdown returns a channel that is sent to when an authorized +// RPC client requests the process to shutdown. If the request can not be read +// immediately, it is dropped. +func (s *rpcServer) RequestedProcessShutdown() <-chan struct{} { + return s.requestProcessShutdown +} + // limitConnections responds with a 503 service unavailable and returns true if // adding another client would exceed the maximum allow RPC clients. // @@ -6205,15 +6216,16 @@ func genCertPair(certFile, keyFile string) error { // newRPCServer returns a new instance of the rpcServer struct. func newRPCServer(listenAddrs []string, policy *mining.Policy, s *server) (*rpcServer, error) { rpc := rpcServer{ - policy: policy, - server: s, - chain: s.blockManager.chain, - statusLines: make(map[int]string), - workState: newWorkState(), - templatePool: make(map[[merkleRootPairSize]byte]*workStateBlockInfo), - gbtWorkState: newGbtWorkState(s.timeSource), - helpCacher: newHelpCacher(), - quit: make(chan int), + policy: policy, + server: s, + chain: s.blockManager.chain, + statusLines: make(map[int]string), + workState: newWorkState(), + templatePool: make(map[[merkleRootPairSize]byte]*workStateBlockInfo), + gbtWorkState: newGbtWorkState(s.timeSource), + helpCacher: newHelpCacher(), + requestProcessShutdown: make(chan struct{}), + quit: make(chan int), } if cfg.RPCUser != "" && cfg.RPCPass != "" { login := cfg.RPCUser + ":" + cfg.RPCPass diff --git a/server.go b/server.go index 8c27d807d3..fec8476a83 100644 --- a/server.go +++ b/server.go @@ -2662,6 +2662,12 @@ func newServer(listenAddrs []string, db database.DB, tmdb *stake.TicketDB, chain if err != nil { return nil, err } + + // Signal process shutdown when the RPC server requests it. + go func() { + <-s.rpcServer.RequestedProcessShutdown() + shutdownRequestChannel <- struct{}{} + }() } return &s, nil diff --git a/service_windows.go b/service_windows.go index 3dee289106..7a1c3772d0 100644 --- a/service_windows.go +++ b/service_windows.go @@ -86,17 +86,11 @@ loop: // more commands while pending. changes <- svc.Status{State: svc.StopPending} - // Stop the main server gracefully when it is - // already setup or just break out and allow - // the service to exit immediately if it's not - // setup yet. Note that calling Stop will cause - // dcrdMain to exit in the goroutine above which - // will in turn send a signal (and a potential - // error) to doneChan. - if mainServer != nil { - mainServer.Stop() - } else { - break loop + // Signal the main function to exit if shutdown + // was not already requested. + select { + case shutdownRequestChannel <- struct{}{}: + default: } default: diff --git a/signal.go b/signal.go index 7235896910..04d1988392 100644 --- a/signal.go +++ b/signal.go @@ -10,79 +10,45 @@ import ( "os/signal" ) -// interruptChannel is used to receive SIGINT (Ctrl+C) signals. -var interruptChannel chan os.Signal +// shutdownRequestChannel is used to initiate shutdown from one of the +// subsystems using the same code paths as when an interrupt signal is received. +var shutdownRequestChannel = make(chan struct{}) -// addHandlerChannel is used to add an interrupt handler to the list of handlers -// to be invoked on SIGINT (Ctrl+C) signals. -var addHandlerChannel = make(chan func()) +// interruptSignals defines the default signals to catch in order to do a proper +// shutdown. This may be modified during init depending on the platform. +var interruptSignals = []os.Signal{os.Interrupt} -// signals defines the default signals to catch in order to do a proper -// shutdown. -var signals = []os.Signal{os.Interrupt} +// interruptListener listens for SIGINT (Ctrl+C) signals and shutdown requests +// from shutdownRequestChannel. It returns a channel that is closed when either +// signal is received. +func interruptListener() <-chan struct{} { + c := make(chan struct{}) -// mainInterruptHandler listens for SIGINT (Ctrl+C) signals on the -// interruptChannel and invokes the registered interruptCallbacks accordingly. -// It also listens for callback registration. It must be run as a goroutine. -func mainInterruptHandler() { - // interruptCallbacks is a list of callbacks to invoke when a - // SIGINT (Ctrl+C) is received. - var interruptCallbacks []func() + go func() { + interruptChannel := make(chan os.Signal, 1) + signal.Notify(interruptChannel, interruptSignals...) - // isShutdown is a flag which is used to indicate whether or not - // the shutdown signal has already been received and hence any future - // attempts to add a new interrupt handler should invoke them - // immediately. - var isShutdown bool - - for { select { case sig := <-interruptChannel: - // Ignore more than one shutdown signal. - if isShutdown { - dcrdLog.Infof("Received signal (%s). "+ - "Already shutting down...", sig) - continue - } - - isShutdown = true - dcrdLog.Infof("Received signal (%s). Shutting down...", - sig) - - // Run handlers in LIFO order. - for i := range interruptCallbacks { - idx := len(interruptCallbacks) - 1 - i - callback := interruptCallbacks[idx] - callback() - } - - // Signal the main goroutine to shutdown. - go func() { - shutdownChannel <- struct{}{} - }() + dcrdLog.Infof("Received signal (%s). Shutting down...", sig) + case <-shutdownRequestChannel: + dcrdLog.Infof("Shutdown requested. Shutting down...") + } - case handler := <-addHandlerChannel: - // The shutdown signal has already been received, so - // just invoke and new handlers immediately. - if isShutdown { - handler() - } + close(c) + }() - interruptCallbacks = append(interruptCallbacks, handler) - } - } + return c } -// addInterruptHandler adds a handler to call when a SIGINT (Ctrl+C) is -// received. -func addInterruptHandler(handler func()) { - // Create the channel and start the main interrupt handler which invokes - // all other callbacks and exits if not already done. - if interruptChannel == nil { - interruptChannel = make(chan os.Signal, 1) - signal.Notify(interruptChannel, signals...) - go mainInterruptHandler() +// interruptRequested returns true when the channel returned by +// interruptListener was closed. This simplifies early shutdown slightly since +// the caller can just use an if statement instead of a select. +func interruptRequested(interrupted <-chan struct{}) bool { + select { + case <-interrupted: + return true + default: + return false } - - addHandlerChannel <- handler } diff --git a/signalsigterm.go b/signalsigterm.go index 7aaa39a706..831655010e 100644 --- a/signalsigterm.go +++ b/signalsigterm.go @@ -12,5 +12,5 @@ import ( ) func init() { - signals = []os.Signal{os.Interrupt, syscall.SIGTERM} + interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} }