diff --git a/hiveproxy/proxy.go b/hiveproxy/proxy.go index be6ad3e502..ab9316541a 100644 --- a/hiveproxy/proxy.go +++ b/hiveproxy/proxy.go @@ -42,6 +42,7 @@ func init() { type Proxy struct { httpsrv http.Server rpc *rpc.Client + waitCh <-chan struct{} serverDown chan struct{} closeOnce sync.Once @@ -49,13 +50,19 @@ type Proxy struct { callID uint64 } -func newProxy(front bool) *Proxy { +func newProxy(front bool, waitCh <-chan struct{}) *Proxy { return &Proxy{ serverDown: make(chan struct{}), + waitCh: waitCh, isFront: front, } } +// Wait blocks until the proxy connection is closed. +func (p *Proxy) Wait() { + <-p.waitCh +} + // Close terminates the proxy. func (p *Proxy) Close() { p.closeOnce.Do(func() { @@ -118,14 +125,14 @@ func (p *Proxy) launchRPC(stream net.Conn) { // // All communication with the backend runs over the given r,w streams. func RunFrontend(r io.Reader, w io.WriteCloser, listener net.Listener) (*Proxy, error) { - p := newProxy(true) mux, err := yamux.Client(rwCombo{r, w}, muxcfg) if err != nil { return nil, err } + p := newProxy(true, mux.CloseChan()) // Launch RPC handler. - rpcConn, err := mux.Open() + rpcConn, err := mux.Accept() if err != nil { mux.Close() return nil, err @@ -156,14 +163,15 @@ func RunFrontend(r io.Reader, w io.WriteCloser, listener net.Listener) (*Proxy, // // All communication with the frontend runs over the given r,w streams. func RunBackend(r io.Reader, w io.WriteCloser, h http.Handler) (*Proxy, error) { - p := newProxy(false) mux, err := yamux.Server(rwCombo{r, w}, muxcfg) if err != nil { return nil, err } + p := newProxy(false, mux.CloseChan()) + // Start RPC client. - rpcConn, err := mux.Accept() + rpcConn, err := mux.Open() if err != nil { mux.Close() return nil, err diff --git a/hiveproxy/proxy_test.go b/hiveproxy/proxy_test.go index 981ba2de66..6ebf83909c 100644 --- a/hiveproxy/proxy_test.go +++ b/hiveproxy/proxy_test.go @@ -61,6 +61,24 @@ func TestProxyCheckLiveCancel(t *testing.T) { t.Log(err) } +func TestProxyWait(t *testing.T) { + p := runProxyPair(t, nil) + + unblock := make(chan struct{}) + go func() { + p.front.Wait() + close(unblock) + }() + + p.back.Close() + + select { + case <-unblock: + case <-time.After(5 * time.Second): + t.Fatal("Wait did not unblock") + } +} + type proxyPair struct { front *Proxy back *Proxy diff --git a/hiveproxy/tool/main.go b/hiveproxy/tool/main.go index 82dede7ccf..dff5e60276 100644 --- a/hiveproxy/tool/main.go +++ b/hiveproxy/tool/main.go @@ -16,6 +16,9 @@ func main() { if err != nil { panic(err) } - hiveproxy.RunFrontend(os.Stdin, os.Stdout, l) - select {} + p, err := hiveproxy.RunFrontend(os.Stdin, os.Stdout, l) + if err != nil { + panic(err) + } + p.Wait() } diff --git a/internal/libdocker/container.go b/internal/libdocker/container.go index 8d3d928676..849bd7bae4 100644 --- a/internal/libdocker/container.go +++ b/internal/libdocker/container.go @@ -93,6 +93,11 @@ func (b *ContainerBackend) CreateContainer(ctx context.Context, imageName string createOpts.Config.StdinOnce = true createOpts.Config.OpenStdin = true } + if opt.Output != nil { + // Pre-announce that stdout will be attached. Not sure if this does anything, + // but it's probably best to give Docker the info as early as possible. + createOpts.Config.AttachStdout = true + } c, err := b.client.CreateContainer(createOpts) if err != nil { @@ -133,8 +138,9 @@ func (b *ContainerBackend) StartContainer(ctx context.Context, containerID strin go func() { defer close(containerExit) err := waiter.Wait() - waiter.Close() logger.Debug("container exited", "err", err) + err = waiter.Close() + logger.Debug("container files closed", "err", err) }() // Set up the wait function. info.Wait = func() { <-containerExit } diff --git a/internal/libdocker/proxy.go b/internal/libdocker/proxy.go index 179fe5b403..6d84719b24 100644 --- a/internal/libdocker/proxy.go +++ b/internal/libdocker/proxy.go @@ -29,16 +29,35 @@ func (cb *ContainerBackend) ServeAPI(ctx context.Context, h http.Handler) (libhi if err != nil { return nil, err } + + // Launch the proxy server before starting the container. + var ( + proxy *hiveproxy.Proxy + proxyErrC = make(chan error, 1) + ) + go func() { + var err error + proxy, err = hiveproxy.RunBackend(outR, inW, h) + if err != nil { + log15.Error("proxy backend startup failed", "err", err) + } + proxyErrC <- err + }() + + // Now start the container. info, err := cb.StartContainer(ctx, id, opts) if err != nil { cb.DeleteContainer(id) return nil, err } - proxy, err := hiveproxy.RunBackend(outR, inW, h) - if err != nil { - cb.DeleteContainer(id) - return nil, err + // Proxy server should come up. + select { + case err := <-proxyErrC: + if err != nil { + cb.DeleteContainer(id) + return nil, err + } } // Register proxy in ContainerBackend, so it can be used for CheckLive.