Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daemon/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (d *Daemon) ContainerExecStart(ctx context.Context, name string, stdin io.R

attachErr := container.AttachStreams(ctx, ec.StreamConfig, ec.OpenStdin, true, ec.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)

if err := d.containerd.AddProcess(ctx, c.ID, name, p); err != nil {
if err := d.containerd.AddProcess(ctx, c.ID, name, p, d.AttachExecStreams(ec)); err != nil {
return err
}

Expand Down
49 changes: 47 additions & 2 deletions daemon/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"

"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/runconfig"
)
Expand Down Expand Up @@ -101,10 +102,18 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error {
return nil
}

// AttachStreams is called by libcontainerd to connect the stdio.
func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {
// AttachStreams is called by libcontainerd to connect the stdio to a container.
func (daemon *Daemon) AttachContainerStreams(id string, iop libcontainerd.IOPipe) error {
var s *runconfig.StreamConfig
c := daemon.containers.Get(id)

// Ensure we're only called for the container's main process. There's
// logic to be simplified in this function after this, but we're trying
// to keep the patch small.
if c == nil {
return fmt.Errorf("no such container: %s", id)
}

if c == nil {
ec, err := daemon.getExecConfig(id)
if err != nil {
Expand Down Expand Up @@ -154,3 +163,39 @@ func (daemon *Daemon) AttachStreams(id string, iop libcontainerd.IOPipe) error {

return nil
}

// AttachExecStreams is used as a callback for libcontainerd to connect the
// stdio streams of an exec process to a dockerd client, using the StreamConfig
// in the exec.Config rather than the container's main PID's stdio.
func (daemon *Daemon) AttachExecStreams(config *exec.Config) func(id string, iop libcontainerd.IOPipe) error {
return func(id string, iop libcontainerd.IOPipe) error {
s := config.StreamConfig
copyFunc := func(w io.Writer, r io.Reader) {
s.Add(1)
go func() {
if _, err := io.Copy(w, r); err != nil {
logrus.Errorf("%v stream copy error: %v", id, err)
}
s.Done()
}()
}

if iop.Stdout != nil {
copyFunc(s.Stdout(), iop.Stdout)
}
if iop.Stderr != nil {
copyFunc(s.Stderr(), iop.Stderr)
}

if stdin := s.Stdin(); stdin != nil {
if iop.Stdin != nil {
go func() {
io.Copy(iop.Stdin, stdin)
iop.Stdin.Close()
}()
}
}

return nil
}
}
10 changes: 3 additions & 7 deletions libcontainerd/client_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type client struct {
liveRestore bool
}

func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio ProcessStreamAttacher) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
Expand Down Expand Up @@ -98,13 +98,9 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly

container.processes[processFriendlyName] = p

clnt.unlock(containerID)

if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
if err := attachStdio(processFriendlyName, *iopipe); err != nil {
return err
}
clnt.lock(containerID)

return nil
}
Expand Down Expand Up @@ -422,7 +418,7 @@ func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Ev
return err
}

if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
if err := clnt.backend.AttachContainerStreams(containerID, *iopipe); err != nil {
return err
}

Expand Down
6 changes: 5 additions & 1 deletion libcontainerd/client_solaris.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type client struct {
// Platform specific properties below here.
}

func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio ProcessStreamAttacher) error {
return nil
}

Expand All @@ -32,6 +32,10 @@ func (clnt *client) Resume(containerID string) error {
return nil
}

func (clnt *client) SignalProcess(containerID string, processFriendlyName string, sig int) error {
return nil
}

func (clnt *client) Stats(containerID string) (*Stats, error) {
return nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions libcontainerd/client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (clnt *client) Create(containerID string, spec Spec, options ...CreateOptio

// AddProcess is the handler for adding a process to an already running
// container. It's called through docker exec.
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process) error {
func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, procToAdd Process, attachStdio ProcessStreamAttacher) error {
clnt.lock(containerID)
defer clnt.unlock(containerID)
container, err := clnt.getContainer(containerID)
Expand Down Expand Up @@ -254,7 +254,7 @@ func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendly
clnt.unlock(containerID)

// Tell the engine to attach streams back to the client
if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
if err := attachStdio(processFriendlyName, *iopipe); err != nil {
clnt.lock(containerID)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion libcontainerd/container_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (ctr *container) start() error {
}
ctr.startedAt = time.Now()

if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
if err := ctr.client.backend.AttachContainerStreams(ctr.containerID, *iopipe); err != nil {
return err
}
ctr.systemPid = systemPid(resp.Container)
Expand Down
2 changes: 1 addition & 1 deletion libcontainerd/container_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (ctr *container) start() error {

ctr.client.appendContainer(ctr)

if err := ctr.client.backend.AttachStreams(ctr.containerID, *iopipe); err != nil {
if err := ctr.client.backend.AttachContainerStreams(ctr.containerID, *iopipe); err != nil {
// OK to return the error here, as waitExit will handle tear-down in HCS
return err
}
Expand Down
6 changes: 4 additions & 2 deletions libcontainerd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ type CommonStateInfo struct { // FIXME: event?
// Backend defines callbacks that the client of the library needs to implement.
type Backend interface {
StateChanged(containerID string, state StateInfo) error
AttachStreams(processFriendlyName string, io IOPipe) error
AttachContainerStreams(processFriendlyName string, io IOPipe) error
}

type ProcessStreamAttacher func(processFriendlyName string, io IOPipe) error

// Client provides access to containerd features.
type Client interface {
Create(containerID string, spec Spec, options ...CreateOption) error
Signal(containerID string, sig int) error
SignalProcess(containerID string, processFriendlyName string, sig int) error
AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process) error
AddProcess(ctx context.Context, containerID, processFriendlyName string, process Process, attachStdio ProcessStreamAttacher) error
Resize(containerID, processFriendlyName string, width, height int) error
Pause(containerID string) error
Resume(containerID string) error
Expand Down