Skip to content

Commit

Permalink
Expose Runner interface to allow custom command runner implementations (
Browse files Browse the repository at this point in the history
#270)

* Adds a new set of interfaces in a `runner` package to allow custom implementations for running plugins.
* Adds 3 new client-facing config options:
  * `RunnerFunc` can be supplied to customise how a plugin is run, with `*exec.Cmd` treated as the spec.
  * `SkipHostEnv` tells go-plugin not to add the host process' environment variables to the command spec
  * `ReattachConfig.ReattachFunc` can be supplied to hook into running plugins that are identified by something other than a process ID
* From a server (plugin) point of view; Adds optional environment variables to control the plugin's listener unix sockets
* Adds `CmdRunner` as the default `Runner` implementation, maintaining existing functionality and backwards compatibility
  • Loading branch information
tomhjp committed Aug 22, 2023
1 parent f31f0fb commit de19819
Show file tree
Hide file tree
Showing 19 changed files with 655 additions and 124 deletions.
169 changes: 96 additions & 73 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin/internal/cmdrunner"
"github.com/hashicorp/go-plugin/runner"
"google.golang.org/grpc"
)

const unrecognizedRemotePluginMessage = `Unrecognized remote plugin message: %s
const unrecognizedRemotePluginMessage = `Unrecognized remote plugin message: %q
This usually means
the plugin was not compiled for this architecture,
the plugin is missing dynamic-link libraries necessary to run,
Expand All @@ -52,7 +54,7 @@ var managedClientsLock sync.Mutex
var (
// ErrProcessNotFound is returned when a client is instantiated to
// reattach to an existing process and it isn't found.
ErrProcessNotFound = errors.New("Reattachment process not found")
ErrProcessNotFound = cmdrunner.ErrProcessNotFound

// ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match
// the one provided in the SecureConfig.
Expand Down Expand Up @@ -87,7 +89,7 @@ type Client struct {
exited bool
l sync.Mutex
address net.Addr
process *os.Process
runner runner.AttachedRunner
client ClientProtocol
protocol Protocol
logger hclog.Logger
Expand All @@ -106,6 +108,8 @@ type Client struct {
// processKilled is used for testing only, to flag when the process was
// forcefully killed.
processKilled bool

hostSocketDir string
}

// NegotiatedVersion returns the protocol version negotiated with the server.
Expand Down Expand Up @@ -141,6 +145,13 @@ type ClientConfig struct {
Cmd *exec.Cmd
Reattach *ReattachConfig

// RunnerFunc allows consumers to provide their own implementation of
// runner.Runner and control the context within which a plugin is executed.
// The cmd argument will have been copied from the config and populated with
// environment variables that a go-plugin server expects to read such as
// AutoMTLS certs and the magic cookie key.
RunnerFunc func(l hclog.Logger, cmd *exec.Cmd, tmpDir string) (runner.Runner, error)

// SecureConfig is configuration for verifying the integrity of the
// executable. It can not be used with Reattach.
SecureConfig *SecureConfig
Expand Down Expand Up @@ -220,6 +231,10 @@ type ClientConfig struct {
// to create gRPC connections. This only affects plugins using the gRPC
// protocol.
GRPCDialOptions []grpc.DialOption

// SkipHostEnv allows plugins to run without inheriting the parent process'
// environment variables.
SkipHostEnv bool
}

// ReattachConfig is used to configure a client to reattach to an
Expand All @@ -231,6 +246,11 @@ type ReattachConfig struct {
Addr net.Addr
Pid int

// ReattachFunc allows consumers to provide their own implementation of
// runner.AttachedRunner and attach to something other than a plain process.
// At least one of Pid or ReattachFunc must be set.
ReattachFunc runner.ReattachFunc

// Test is set to true if this is reattaching to to a plugin in "test mode"
// (see ServeConfig.Test). In this mode, client.Kill will NOT kill the
// process and instead will rely on the plugin to terminate itself. This
Expand Down Expand Up @@ -418,23 +438,28 @@ func (c *Client) killed() bool {
func (c *Client) Kill() {
// Grab a lock to read some private fields.
c.l.Lock()
process := c.process
runner := c.runner
addr := c.address
hostSocketDir := c.hostSocketDir
c.l.Unlock()

// If there is no process, there is nothing to kill.
if process == nil {
// If there is no runner or ID, there is nothing to kill.
if runner == nil || runner.ID() == "" {
return
}

defer func() {
// Wait for the all client goroutines to finish.
c.clientWaitGroup.Wait()

if hostSocketDir != "" {
os.RemoveAll(hostSocketDir)
}

// Make sure there is no reference to the old process after it has been
// killed.
c.l.Lock()
c.process = nil
c.runner = nil
c.l.Unlock()
}()

Expand Down Expand Up @@ -477,7 +502,9 @@ func (c *Client) Kill() {

// If graceful exiting failed, just kill it
c.logger.Warn("plugin failed to exit gracefully")
process.Kill()
if err := runner.Kill(); err != nil {
c.logger.Debug("error killing plugin", "error", err)
}

c.l.Lock()
c.processKilled = true
Expand Down Expand Up @@ -516,7 +543,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
attachSet := c.config.Reattach != nil
secureSet := c.config.SecureConfig != nil
if cmdSet == attachSet {
return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
return nil, fmt.Errorf("exactly one of Cmd or Reattach must be set")
}

if secureSet && attachSet {
Expand Down Expand Up @@ -555,19 +582,12 @@ func (c *Client) Start() (addr net.Addr, err error) {
}

cmd := c.config.Cmd
cmd.Env = append(cmd.Env, os.Environ()...)
if !c.config.SkipHostEnv {
cmd.Env = append(cmd.Env, os.Environ()...)
}
cmd.Env = append(cmd.Env, env...)
cmd.Stdin = os.Stdin

cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmdStderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}

if c.config.SecureConfig != nil {
if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
return nil, fmt.Errorf("error verifying checksum: %s", err)
Expand Down Expand Up @@ -601,26 +621,42 @@ func (c *Client) Start() (addr net.Addr, err error) {
}
}

c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
err = cmd.Start()
if err != nil {
return
var runner runner.Runner
switch {
case c.config.RunnerFunc != nil:
c.hostSocketDir, err = os.MkdirTemp("", "")
if err != nil {
return nil, err
}
c.logger.Trace("created temporary directory for unix sockets", "dir", c.hostSocketDir)
runner, err = c.config.RunnerFunc(c.logger, cmd, c.hostSocketDir)
if err != nil {
return nil, err
}
default:
runner, err = cmdrunner.NewCmdRunner(c.logger, cmd)
if err != nil {
return nil, err
}

}

// Set the process
c.process = cmd.Process
c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid)
c.runner = runner
err = runner.Start()
if err != nil {
return nil, err
}

// Make sure the command is properly cleaned up if there is an error
defer func() {
r := recover()
rErr := recover()

if err != nil || r != nil {
cmd.Process.Kill()
if err != nil || rErr != nil {
runner.Kill()
}

if r != nil {
panic(r)
if rErr != nil {
panic(rErr)
}
}()

Expand All @@ -631,7 +667,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
c.clientWaitGroup.Add(1)
c.stderrWaitGroup.Add(1)
// logStderr calls Done()
go c.logStderr(cmdStderr)
go c.logStderr(runner.Name(), runner.Stderr())

c.clientWaitGroup.Add(1)
go func() {
Expand All @@ -640,29 +676,17 @@ func (c *Client) Start() (addr net.Addr, err error) {

defer c.clientWaitGroup.Done()

// get the cmd info early, since the process information will be removed
// in Kill.
pid := c.process.Pid
path := cmd.Path

// wait to finish reading from stderr since the stderr pipe reader
// will be closed by the subsequent call to cmd.Wait().
c.stderrWaitGroup.Wait()

// Wait for the command to end.
err := cmd.Wait()

msgArgs := []interface{}{
"path", path,
"pid", pid,
}
err := runner.Wait()
if err != nil {
msgArgs = append(msgArgs,
[]interface{}{"error", err.Error()}...)
c.logger.Error("plugin process exited", msgArgs...)
c.logger.Error("plugin process exited", "plugin", runner.Name(), "id", runner.ID(), "error", err.Error())
} else {
// Log and make sure to flush the logs right away
c.logger.Info("plugin process exited", msgArgs...)
c.logger.Info("plugin process exited", "plugin", runner.Name(), "id", runner.ID())
}

os.Stderr.Sync()
Expand All @@ -681,10 +705,13 @@ func (c *Client) Start() (addr net.Addr, err error) {
defer c.clientWaitGroup.Done()
defer close(linesCh)

scanner := bufio.NewScanner(cmdStdout)
scanner := bufio.NewScanner(runner.Stdout())
for scanner.Scan() {
linesCh <- scanner.Text()
}
if scanner.Err() != nil {
c.logger.Error("error encountered while scanning stdout", "error", scanner.Err())
}
}()

// Make sure after we exit we read the lines from stdout forever
Expand Down Expand Up @@ -751,13 +778,18 @@ func (c *Client) Start() (addr net.Addr, err error) {
c.negotiatedVersion = version
c.logger.Debug("using plugin", "version", version)

switch parts[2] {
network, address, err := runner.PluginToHost(parts[2], parts[3])
if err != nil {
return addr, err
}

switch network {
case "tcp":
addr, err = net.ResolveTCPAddr("tcp", parts[3])
addr, err = net.ResolveTCPAddr("tcp", address)
case "unix":
addr, err = net.ResolveUnixAddr("unix", parts[3])
addr, err = net.ResolveUnixAddr("unix", address)
default:
err = fmt.Errorf("Unknown address type: %s", parts[3])
err = fmt.Errorf("Unknown address type: %s", address)
}

// If we have a server type, then record that. We default to net/rpc
Expand Down Expand Up @@ -818,39 +850,30 @@ func (c *Client) loadServerCert(cert string) error {
}

func (c *Client) reattach() (net.Addr, error) {
// Verify the process still exists. If not, then it is an error
p, err := os.FindProcess(c.config.Reattach.Pid)
if err != nil {
// On Unix systems, FindProcess never returns an error.
// On Windows, for non-existent pids it returns:
// os.SyscallError - 'OpenProcess: the paremter is incorrect'
return nil, ErrProcessNotFound
reattachFunc := c.config.Reattach.ReattachFunc
// For backwards compatibility default to cmdrunner.ReattachFunc
if reattachFunc == nil {
reattachFunc = cmdrunner.ReattachFunc(c.config.Reattach.Pid, c.config.Reattach.Addr)
}

// Attempt to connect to the addr since on Unix systems FindProcess
// doesn't actually return an error if it can't find the process.
conn, err := net.Dial(
c.config.Reattach.Addr.Network(),
c.config.Reattach.Addr.String())
r, err := reattachFunc()
if err != nil {
p.Kill()
return nil, ErrProcessNotFound
return nil, err
}
conn.Close()

// Create a context for when we kill
c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())

c.clientWaitGroup.Add(1)
// Goroutine to mark exit status
go func(pid int) {
go func(r runner.AttachedRunner) {
defer c.clientWaitGroup.Done()

// ensure the context is cancelled when we're done
defer c.ctxCancel()

// Wait for the process to die
pidWait(pid)
r.Wait()

// Log so we can see it
c.logger.Debug("reattached plugin process exited")
Expand All @@ -859,7 +882,7 @@ func (c *Client) reattach() (net.Addr, error) {
c.l.Lock()
defer c.l.Unlock()
c.exited = true
}(p.Pid)
}(r)

// Set the address and protocol
c.address = c.config.Reattach.Addr
Expand All @@ -877,7 +900,7 @@ func (c *Client) reattach() (net.Addr, error) {
// process being killed (the only purpose we have for c.process), since
// in test mode the process is responsible for exiting on its own.
if !c.config.Reattach.Test {
c.process = p
c.runner = r
}

return c.address, nil
Expand Down Expand Up @@ -989,10 +1012,10 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {

var stdErrBufferSize = 64 * 1024

func (c *Client) logStderr(r io.Reader) {
func (c *Client) logStderr(name string, r io.Reader) {
defer c.clientWaitGroup.Done()
defer c.stderrWaitGroup.Done()
l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
l := c.logger.Named(filepath.Base(name))

reader := bufio.NewReaderSize(r, stdErrBufferSize)
// continuation indicates the previous line was a prefix
Expand Down
Loading

0 comments on commit de19819

Please sign in to comment.