diff --git a/daemon/exec.go b/daemon/exec.go index d57b6875d8b8a..ce1ea6eb91873 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -204,7 +204,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys) - if err := d.containerd.AddProcess(ctx, c.ID, name, p); err != nil { + if err := d.containerd.AddProcess(ctx, c.ID, name, p, d.AttachExecStreams(ec)); err != nil { return err } diff --git a/daemon/monitor.go b/daemon/monitor.go index 1f97efb4726f2..2da5da57f240a 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -8,6 +8,7 @@ import ( "strconv" "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/exec" "github.com/docker/docker/libcontainerd" "github.com/docker/docker/runconfig" ) @@ -101,10 +102,18 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { return nil } -// AttachStreams is called by libcontainerd to connect the stdio. -func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error { +// AttachStreams is called by libcontainerd to connect the stdio to a container. +func (daemon *Daemon) AttachContainerStreams(id string, iop libcontainerd.IOPipe) error { var s *runconfig.StreamConfig c := daemon.containers.Get(id) + + // Ensure we're only called for the container's main process. There's + // logic to be simplified in this function after this, but we're trying + // to keep the patch small. + if c == nil { + return fmt.Errorf("no such container: %s", id) + } + if c == nil { ec, err := daemon.getExecConfig(id) if err != nil { @@ -154,3 +163,39 @@ func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error { return nil } + +// AttachExecStreams is used as a callback for libcontainerd to connect the +// stdio streams of an exec process to a dockerd client, using the StreamConfig +// in the exec.Config rather than the container's main PID's stdio. +func (daemon *Daemon) AttachExecStreams(config *exec.Config) func(id string, iop libcontainerd.IOPipe) error { + return func(id string, iop libcontainerd.IOPipe) error { + s := config.StreamConfig + copyFunc := func(w io.Writer, r io.Reader) { + s.Add(1) + go func() { + if _, err := io.Copy(w, r); err != nil { + logrus.Errorf("%v stream copy error: %v", id, err) + } + s.Done() + }() + } + + if iop.Stdout != nil { + copyFunc(s.Stdout(), iop.Stdout) + } + if iop.Stderr != nil { + copyFunc(s.Stderr(), iop.Stderr) + } + + if stdin := s.Stdin(); stdin != nil { + if iop.Stdin != nil { + go func() { + io.Copy(iop.Stdin, stdin) + iop.Stdin.Close() + }() + } + } + + return nil + } +} diff --git a/libcontainerd/client_linux.go b/libcontainerd/client_linux.go index ef504cad5939e..e4108ab9cc808 100644 --- a/libcontainerd/client_linux.go +++ b/libcontainerd/client_linux.go @@ -30,7 +30,7 @@ type client struct { liveRestore bool } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio ProcessStreamAttacher) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -98,13 +98,9 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly container.processes[processFriendlyName] = p - clnt.unlock(containerID) - - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { - clnt.lock(containerID) + if err := attachStdio(processFriendlyName, *iopipe); err != nil { return err } - clnt.lock(containerID) return nil } @@ -422,7 +418,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev return err } - if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil { + if err := clnt.backend.AttachContainerStreams(containerID, *iopipe); err != nil { return err } diff --git a/libcontainerd/client_solaris.go b/libcontainerd/client_solaris.go index 1c14d301b5925..b755f0d72022b 100644 --- a/libcontainerd/client_solaris.go +++ b/libcontainerd/client_solaris.go @@ -8,7 +8,7 @@ type client struct { // Platform specific properties below here. } -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio ProcessStreamAttacher) error { return nil } @@ -32,6 +32,10 @@ func (clnt *client) Resume(containerID string) error { return nil } +func (clnt *client) SignalProcess(containerID string, processFriendlyName string, sig int) error { + return nil +} + func (clnt *client) Stats(containerID string) (*Stats, error) { return nil, nil } diff --git a/libcontainerd/client_windows.go b/libcontainerd/client_windows.go index 431574a4d3f83..ea3d00c5f694d 100644 --- a/libcontainerd/client_windows.go +++ b/libcontainerd/client_windows.go @@ -171,7 +171,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio // AddProcess is the handler for adding a process to an already running // container. It's called through docker exec. -func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) error { +func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio ProcessStreamAttacher) error { clnt.lock(containerID) defer clnt.unlock(containerID) container, err := clnt.getContainer(containerID) @@ -254,7 +254,7 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly clnt.unlock(containerID) // Tell the engine to attach streams back to the client - if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil { + if err := attachStdio(processFriendlyName, *iopipe); err != nil { clnt.lock(containerID) return err } diff --git a/libcontainerd/container_linux.go b/libcontainerd/container_linux.go index 454478b5c2a03..1ac68981407ab 100644 --- a/libcontainerd/container_linux.go +++ b/libcontainerd/container_linux.go @@ -116,7 +116,7 @@ func (ctr *container) start() error { } ctr.startedAt = time.Now() - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := ctr.client.backend.AttachContainerStreams(ctr.containerID, *iopipe); err != nil { return err } ctr.systemPid = systemPid(resp.Container) diff --git a/libcontainerd/container_windows.go b/libcontainerd/container_windows.go index 31c2227df0ae8..7944be41e85a7 100644 --- a/libcontainerd/container_windows.go +++ b/libcontainerd/container_windows.go @@ -142,7 +142,7 @@ func (ctr *container) start() error { ctr.client.appendContainer(ctr) - if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil { + if err := ctr.client.backend.AttachContainerStreams(ctr.containerID, *iopipe); err != nil { // OK to return the error here, as waitExit will handle tear-down in HCS return err } diff --git a/libcontainerd/types.go b/libcontainerd/types.go index 6f452c1c3b5c0..66060ddd2f3e0 100644 --- a/libcontainerd/types.go +++ b/libcontainerd/types.go @@ -31,15 +31,17 @@ type CommonStateInfo struct { // FIXME: event? // Backend defines callbacks that the client of the library needs to implement. type Backend interface { StateChanged(containerID string, state StateInfo) error - AttachStreams(processFriendlyName string, io IOPipe) error + AttachContainerStreams(processFriendlyName string, io IOPipe) error } +type ProcessStreamAttacher func(processFriendlyName string, io IOPipe) error + // Client provides access to containerd features. type Client interface { Create(containerID string, spec Spec, options ...CreateOption) error Signal(containerID string, sig int) error SignalProcess(containerID string, processFriendlyName string, sig int) error - AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) error + AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio ProcessStreamAttacher) error Resize(containerID, processFriendlyName string, width, height int) error Pause(containerID string) error Resume(containerID string) error