Skip to content

Commit

Permalink
Add pipes for parent process IPC.
Browse files Browse the repository at this point in the history
Rewrite startup/shutdown logic to simplify shutdown signaling.  All
cleanup is now run from deferred functions in the main function.

Add two new config options to set the read and write ends of a pair of
pipes.  This is used as a simple mechanism for a parent process to
communicate with, observe, and manage the lifetime of a child dcrd
process.  When the RX (read end) pipe is closed, clean shutdown
automatically begins.

Add a new flag --lifetimeevents to create and send lifetime event
notifications over the TX (write end) pipe during bringup and
shutdown.  This allows the parent process to observe which subsystems
are currently starting, running, and stopping.

Fixes #297.
Fixes #298.
  • Loading branch information
jrick committed Aug 29, 2016
1 parent 99b7aad commit e0c3df8
Show file tree
Hide file tree
Showing 8 changed files with 373 additions and 142 deletions.
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ type config struct {
DropAddrIndex bool `long:"dropaddrindex" description:"Deletes the address-based transaction index from the database on start up and then exits."`
NoExistsAddrIndex bool `long:"noexistsaddrindex" description:"Disable the exists address index, which tracks whether or not an address has even been used."`
DropExistsAddrIndex bool `long:"dropexistsaddrindex" description:"Deletes the exists address index from the database on start up and then exits."`
PipeRx uint `long:"piperx" description:"File descriptor of read end pipe to enable parent -> child process communication"`
PipeTx uint `long:"pipetx" description:"File descriptor of write end pipe to enable parent <- child process communication"`
LifetimeEvents bool `long:"lifetimeevents" description:"Send lifetime notifications over the TX pipe"`
onionlookup func(string) ([]net.IP, error)
lookup func(string) ([]net.IP, error)
oniondial func(string, string) (net.Conn, error)
Expand Down
119 changes: 81 additions & 38 deletions dcrd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ import (
"time"

"github.com/decred/dcrd/blockchain/indexers"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/limits"
)

var (
cfg *config
shutdownChannel = make(chan struct{})
)
var cfg *config

// winServiceMain is only invoked on Windows. It detects when dcrd is running
// as a service and reacts accordingly.
Expand All @@ -44,6 +42,9 @@ func dcrdMain(serverChan chan<- *server) error {
cfg = tcfg
defer backendLog.Flush()

interrupted := interruptListener()
defer dcrdLog.Info("Shutdown complete")

// Show version at startup.
dcrdLog.Infof("Version %s", version())
// Show dcrd home dir location
Expand Down Expand Up @@ -92,38 +93,50 @@ func dcrdMain(serverChan chan<- *server) error {
}()
}

var lifetimeNotifier lifetimeEventServer
if cfg.LifetimeEvents {
lifetimeNotifier = newLifetimeEventServer(outgoingPipeMessages)
}

if cfg.PipeRx != 0 {
go serviceControlPipeRx(uintptr(cfg.PipeRx))
}
if cfg.PipeTx != 0 {
go serviceControlPipeTx(uintptr(cfg.PipeTx))
} else {
go drainOutgoingPipeMessages()
}

if interruptRequested(interrupted) {
return nil
}

// Perform upgrades to dcrd as new versions require it.
if err := doUpgrades(); err != nil {
dcrdLog.Errorf("%v", err)
return err
}

// Load the block database.
db, err := loadBlockDB()
if err != nil {
dcrdLog.Errorf("%v", err)
return err
if interruptRequested(interrupted) {
return nil
}
defer db.Close()

tmdb, err := loadTicketDB(db, activeNetParams.Params)
// Load the block database.
lifetimeNotifier.notifyStartupEvent(lifetimeEventDBOpen)
db, err := loadBlockDB()
if err != nil {
dcrdLog.Errorf("%v", err)
return err
}
defer func() {
err := tmdb.Store(cfg.DataDir, "ticketdb.gob")
if err != nil {
dcrdLog.Errorf("Failed to store ticket database: %v", err.Error())
}
}()
defer tmdb.Close()

// Ensure the databases are sync'd and closed on Ctrl+C.
addInterruptHandler(func() {
lifetimeNotifier.notifyShutdownEvent(lifetimeEventDBOpen)
dcrdLog.Infof("Gracefully shutting down the database...")
db.Close()
})
}()

if interruptRequested(interrupted) {
return nil
}

// Drop indexes and exit if requested.
//
Expand Down Expand Up @@ -154,40 +167,70 @@ func dcrdMain(serverChan chan<- *server) error {
return nil
}

// The ticket "DB" takes ages to load and serialize back out to a file.
// Load it asynchronously and if the process is interrupted during the
// load, discard the result since no cleanup is necessary.
lifetimeNotifier.notifyStartupEvent(lifetimeEventTicketDB)
type ticketDBResult struct {
ticketDB *stake.TicketDB
err error
}
ticketDBResultChan := make(chan ticketDBResult)
go func() {
tmdb, err := loadTicketDB(db, activeNetParams.Params)
ticketDBResultChan <- ticketDBResult{tmdb, err}
}()
var tmdb *stake.TicketDB
select {
case <-interrupted:
return nil
case r := <-ticketDBResultChan:
if r.err != nil {
dcrdLog.Errorf("%v", err)
return err
}
tmdb = r.ticketDB
}
defer func() {
lifetimeNotifier.notifyShutdownEvent(lifetimeEventTicketDB)
tmdb.Close()
err := tmdb.Store(cfg.DataDir, "ticketdb.gob")
if err != nil {
dcrdLog.Errorf("Failed to store ticket database: %v", err.Error())
}
}()

// Create server and start it.
lifetimeNotifier.notifyStartupEvent(lifetimeEventP2PServer)
server, err := newServer(cfg.Listeners, db, tmdb, activeNetParams.Params)
if err != nil {
// TODO(oga) this logging could do with some beautifying.
dcrdLog.Errorf("Unable to start server on %v: %v",
cfg.Listeners, err)
return err
}
addInterruptHandler(func() {
defer func() {
lifetimeNotifier.notifyShutdownEvent(lifetimeEventP2PServer)
dcrdLog.Infof("Gracefully shutting down the server...")
server.Stop()
server.WaitForShutdown()
})
srvrLog.Infof("Server shutdown complete")
}()

server.Start()
if serverChan != nil {
serverChan <- server
}

// Monitor for graceful server shutdown and signal the main goroutine
// when done. This is done in a separate goroutine rather than waiting
// directly so the main goroutine can be signaled for shutdown by either
// a graceful shutdown or from the main interrupt handler. This is
// necessary since the main goroutine must be kept running long enough
// for the interrupt handler goroutine to finish.
go func() {
server.WaitForShutdown()
srvrLog.Infof("Server shutdown complete")
shutdownChannel <- struct{}{}
}()
if interruptRequested(interrupted) {
return nil
}

lifetimeNotifier.notifyStartupComplete()

// Wait for shutdown signal from either a graceful server stop or from
// the interrupt handler.
<-shutdownChannel
dcrdLog.Info("Shutdown complete")
// Wait until the interrupt signal is received from an OS signal or
// shutdown is requested through the RPC server.
<-interrupted
return nil
}

Expand Down
207 changes: 207 additions & 0 deletions ipc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright (c) 2016 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

package main

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"os"
)

// Messages sent over a pipe are encoded using a simple binary message format:
//
// - Message type length (1 byte)
// - Message type string (encoded as UTF8, no longer than 255 bytes)
// - Message payload length (4 bytes, little endian)
// - Message payload bytes (no longer than 2^32 - 1 bytes)
type pipeMessage interface {
Type() string
PayloadSize() uint32
WritePayload(w io.Writer) error
}

var outgoingPipeMessages = make(chan pipeMessage)

// serviceControlPipeRx reads from the file descriptor fd of a read end pipe.
// This is intended to be used as a simple control mechanism for parent
// processes to communicate with and and manage the lifetime of a dcrd child
// process using a unidirectional pipe (on Windows, this is an anonymous pipe,
// not a named pipe).
//
// When the pipe is closed or any other errors occur reading the control
// message, shutdown begins. This prevents dcrd from continuing to run
// unsupervised after the parent process closes unexpectedly.
//
// No control messages are currently defined and the only use for the pipe is to
// start clean shutdown when the pipe is closed. Control messages that follow
// the pipe message format can be added later as needed.
func serviceControlPipeRx(fd uintptr) {
pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd))
r := bufio.NewReader(pipe)
for {
_, err := r.Discard(1024)
if err == io.EOF {
err = nil
break
}
if err != nil {
dcrdLog.Errorf("Failed to read from pipe: %v", err)
break
}
}

select {
case shutdownRequestChannel <- struct{}{}:
default:
}
}

// serviceControlPipeTx sends pipe messages to the file descriptor fd of a write
// end pipe. This is intended to be a simple response and notification system
// for a child dcrd process to communicate with a parent process without the
// need to go through the RPC server.
//
// See the comment on the pipeMessage interface for the binary encoding of a
// pipe message.
func serviceControlPipeTx(fd uintptr) {
defer drainOutgoingPipeMessages()

pipe := os.NewFile(fd, fmt.Sprintf("|%v", fd))
w := bufio.NewWriter(pipe)
headerBuffer := make([]byte, 0, 1+255+4) // capped to max header size
var err error
for m := range outgoingPipeMessages {
mtype := m.Type()
psize := m.PayloadSize()

headerBuffer = append(headerBuffer, byte(len(mtype)))
headerBuffer = append(headerBuffer, mtype...)
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, psize)
headerBuffer = append(headerBuffer, buf...)

_, err = w.Write(headerBuffer)
if err != nil {
break
}

err = m.WritePayload(w)
if err != nil {
break
}

err = w.Flush()
if err != nil {
break
}

headerBuffer = headerBuffer[:0]
}

dcrdLog.Errorf("Failed to write to pipe: %v", err)
}

func drainOutgoingPipeMessages() {
for range outgoingPipeMessages {
}
}

// The lifetimeEvent describes a startup or shutdown event. The message type
// string is "lifetimeevent".
//
// The payload size is always 2 bytes long. The first byte describes whether a
// service or event is about to run or whether startup has completed. The
// second byte, when applicable, describes which event or service is about to
// start or stop.
//
// 0 <event id>: The startup event is about to run
// 1 <ignored>: All startup tasks have completed
// 2 <event id>: The shutdown event is about to run
//
// Event IDs can take on the following values:
//
// 0: Database opening/closing
// 1: Ticket database opening/closing
// 2: Peer-to-peer server starting/stopping
//
// Note that not all subsystems are started/stopped or events run during the
// program's lifetime depending on what features are enabled through the config.
//
// As an example, the following messages may be sent during a typical execution:
//
// 0 0: The database is being opened
// 0 1: The ticket DB is being opened
// 0 2: The P2P server is starting
// 1 0: All startup tasks have completed
// 2 2: The P2P server is stopping
// 2 1: The ticket DB is being closed and written to disk
// 2 0: The database is being closed
type lifetimeEvent struct {
event lifetimeEventID
action lifetimeAction
}

var _ pipeMessage = (*lifetimeEvent)(nil)

type lifetimeEventID byte

const (
startupEvent lifetimeEventID = iota
startupComplete
shutdownEvent
)

type lifetimeAction byte

const (
lifetimeEventDBOpen lifetimeAction = iota
lifetimeEventTicketDB
lifetimeEventP2PServer
)

func (*lifetimeEvent) Type() string { return "lifetimeevent" }
func (e *lifetimeEvent) PayloadSize() uint32 { return 2 }
func (e *lifetimeEvent) WritePayload(w io.Writer) error {
_, err := w.Write([]byte{byte(e.event), byte(e.action)})
return err
}

type lifetimeEventServer chan<- pipeMessage

func newLifetimeEventServer(outChan chan<- pipeMessage) lifetimeEventServer {
return lifetimeEventServer(outChan)
}

func (s lifetimeEventServer) notifyStartupEvent(action lifetimeAction) {
if s == nil {
return
}
s <- &lifetimeEvent{
event: startupEvent,
action: action,
}
}

func (s lifetimeEventServer) notifyStartupComplete() {
if s == nil {
return
}
s <- &lifetimeEvent{
event: startupComplete,
action: 0,
}
}

func (s lifetimeEventServer) notifyShutdownEvent(action lifetimeAction) {
if s == nil {
return
}
s <- &lifetimeEvent{
event: shutdownEvent,
action: action,
}
}
Loading

0 comments on commit e0c3df8

Please sign in to comment.