Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions hiveproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,27 @@ func init() {
type Proxy struct {
httpsrv http.Server
rpc *rpc.Client
waitCh <-chan struct{}
serverDown chan struct{}
closeOnce sync.Once

isFront bool
callID uint64
}

func newProxy(front bool) *Proxy {
func newProxy(front bool, waitCh <-chan struct{}) *Proxy {
return &Proxy{
serverDown: make(chan struct{}),
waitCh: waitCh,
isFront: front,
}
}

// Wait blocks until the proxy connection is closed.
func (p *Proxy) Wait() {
<-p.waitCh
}

// Close terminates the proxy.
func (p *Proxy) Close() {
p.closeOnce.Do(func() {
Expand Down Expand Up @@ -118,14 +125,14 @@ func (p *Proxy) launchRPC(stream net.Conn) {
//
// All communication with the backend runs over the given r,w streams.
func RunFrontend(r io.Reader, w io.WriteCloser, listener net.Listener) (*Proxy, error) {
p := newProxy(true)
mux, err := yamux.Client(rwCombo{r, w}, muxcfg)
if err != nil {
return nil, err
}
p := newProxy(true, mux.CloseChan())

// Launch RPC handler.
rpcConn, err := mux.Open()
rpcConn, err := mux.Accept()
if err != nil {
mux.Close()
return nil, err
Expand Down Expand Up @@ -156,14 +163,15 @@ func RunFrontend(r io.Reader, w io.WriteCloser, listener net.Listener) (*Proxy,
//
// All communication with the frontend runs over the given r,w streams.
func RunBackend(r io.Reader, w io.WriteCloser, h http.Handler) (*Proxy, error) {
p := newProxy(false)
mux, err := yamux.Server(rwCombo{r, w}, muxcfg)
if err != nil {
return nil, err
}

p := newProxy(false, mux.CloseChan())

// Start RPC client.
rpcConn, err := mux.Accept()
rpcConn, err := mux.Open()
if err != nil {
mux.Close()
return nil, err
Expand Down
18 changes: 18 additions & 0 deletions hiveproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ func TestProxyCheckLiveCancel(t *testing.T) {
t.Log(err)
}

func TestProxyWait(t *testing.T) {
p := runProxyPair(t, nil)

unblock := make(chan struct{})
go func() {
p.front.Wait()
close(unblock)
}()

p.back.Close()

select {
case <-unblock:
case <-time.After(5 * time.Second):
t.Fatal("Wait did not unblock")
}
}

type proxyPair struct {
front *Proxy
back *Proxy
Expand Down
7 changes: 5 additions & 2 deletions hiveproxy/tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func main() {
if err != nil {
panic(err)
}
hiveproxy.RunFrontend(os.Stdin, os.Stdout, l)
select {}
p, err := hiveproxy.RunFrontend(os.Stdin, os.Stdout, l)
if err != nil {
panic(err)
}
p.Wait()
}
8 changes: 7 additions & 1 deletion internal/libdocker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (b *ContainerBackend) CreateContainer(ctx context.Context, imageName string
createOpts.Config.StdinOnce = true
createOpts.Config.OpenStdin = true
}
if opt.Output != nil {
// Pre-announce that stdout will be attached. Not sure if this does anything,
// but it's probably best to give Docker the info as early as possible.
createOpts.Config.AttachStdout = true
}

c, err := b.client.CreateContainer(createOpts)
if err != nil {
Expand Down Expand Up @@ -133,8 +138,9 @@ func (b *ContainerBackend) StartContainer(ctx context.Context, containerID strin
go func() {
defer close(containerExit)
err := waiter.Wait()
waiter.Close()
logger.Debug("container exited", "err", err)
err = waiter.Close()
logger.Debug("container files closed", "err", err)
}()
// Set up the wait function.
info.Wait = func() { <-containerExit }
Expand Down
27 changes: 23 additions & 4 deletions internal/libdocker/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,35 @@ func (cb *ContainerBackend) ServeAPI(ctx context.Context, h http.Handler) (libhi
if err != nil {
return nil, err
}

// Launch the proxy server before starting the container.
var (
proxy *hiveproxy.Proxy
proxyErrC = make(chan error, 1)
)
go func() {
var err error
proxy, err = hiveproxy.RunBackend(outR, inW, h)
if err != nil {
log15.Error("proxy backend startup failed", "err", err)
}
proxyErrC <- err
}()

// Now start the container.
info, err := cb.StartContainer(ctx, id, opts)
if err != nil {
cb.DeleteContainer(id)
return nil, err
}

proxy, err := hiveproxy.RunBackend(outR, inW, h)
if err != nil {
cb.DeleteContainer(id)
return nil, err
// Proxy server should come up.
select {
case err := <-proxyErrC:
if err != nil {
cb.DeleteContainer(id)
return nil, err
}
}

// Register proxy in ContainerBackend, so it can be used for CheckLive.
Expand Down