Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 59 additions & 24 deletions container/container.logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package container
import (
"bufio"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"log/slog"

Expand All @@ -17,8 +20,6 @@ func (c *Container) Logger() *slog.Logger {
// Logs will fetch both STDOUT and STDERR from the current container. Returns a
// ReadCloser and leaves it up to the caller to extract what it wants.
func (c *Container) Logs(ctx context.Context) (io.ReadCloser, error) {
const streamHeaderSize = 8

options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Expand All @@ -29,42 +30,76 @@ func (c *Container) Logs(ctx context.Context) (io.ReadCloser, error) {
return nil, err
}

// Check if the container has TTY enabled, to determine the log format
inspect, err := c.Inspect(ctx)
if err != nil {
rc.Close()
return nil, fmt.Errorf("inspect container: %w", err)
}

// If TTY is enabled, logs are not multiplexed - return them directly
if inspect.Config.Tty {
return rc, nil
}

// TTY is disabled, logs are multiplexed with stream headers - parse them
return c.parseMultiplexedLogs(rc), nil
}

// parseMultiplexedLogs handles the multiplexed log format used when TTY is disabled
func (c *Container) parseMultiplexedLogs(rc io.ReadCloser) io.ReadCloser {
const streamHeaderSize = 8

pr, pw := io.Pipe()
r := bufio.NewReader(rc)

go func() {
lineStarted := true
for err == nil {
line, isPrefix, err := r.ReadLine()

if lineStarted && len(line) >= streamHeaderSize {
line = line[streamHeaderSize:] // trim stream header
lineStarted = false
}
if !isPrefix {
lineStarted = true
}
defer rc.Close()

_, errW := pw.Write(line)
if errW != nil {
return
var closeErr error
defer func() {
if r := recover(); r != nil {
closeErr = fmt.Errorf("panic in log processing: %v", r)
}

if !isPrefix {
_, errW := pw.Write([]byte("\n"))
if errW != nil {
return
if closeErr != nil && !errors.Is(closeErr, io.EOF) {
// Real error, close the pipe with the error
if err := pw.CloseWithError(closeErr); err != nil {
c.logger.Debug("failed to close pipe writer with error", "error", err, "original", closeErr)
}
} else {
// No error or EOF, close the pipe normally
if err := pw.Close(); err != nil {
c.logger.Debug("failed to close pipe writer", "error", err)
}
}
}()

// Process the Docker multiplexed stream format which includes:
// - Byte 0: Stream type (1 = stdout, 2 = stderr)
// - Bytes 1-3: Reserved
// - Bytes 4-7: Frame size (big-endian uint32)
streamHeader := make([]byte, streamHeaderSize)

for {
// Read complete stream header - ensures all 8 bytes are read
if _, err := io.ReadFull(r, streamHeader); err != nil {
closeErr = err
break
}

// Extract frame size from header
frameSize := binary.BigEndian.Uint32(streamHeader[4:])

if err != nil {
_ = pw.CloseWithError(err)
return
// Copy frame data
if _, err := io.CopyN(pw, r, int64(frameSize)); err != nil {
closeErr = err
break
}
}
}()

return pr, nil
return pr
}

// printLogs is a helper function that will print the logs of a Docker container
Expand Down
164 changes: 164 additions & 0 deletions container/container.logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package container_test

import (
"bufio"
"bytes"
"context"
"errors"
"io"
"log/slog"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

dockercontainer "github.com/docker/docker/api/types/container"
"github.com/docker/go-sdk/client"
"github.com/docker/go-sdk/container"
"github.com/docker/go-sdk/container/wait"
)

func TestContainer_Logs_fromFailedContainer(t *testing.T) {
ctx := context.Background()
c, err := container.Run(
ctx,
container.WithImage(alpineLatest),
container.WithCmd("echo", "-n", "I was not expecting this"),
container.WithWaitStrategy(wait.ForLog("I was expecting this").WithTimeout(5*time.Second)),
)

container.Cleanup(t, c)
require.ErrorContains(t, err, "container exited with code 0")

logs, logErr := c.Logs(ctx)
require.NoError(t, logErr)

b, err := io.ReadAll(logs)
require.NoError(t, err)

log := string(b)
require.Contains(t, log, "I was not expecting this")
}

func TestContainer_Logs_shouldBeWithoutStreamHeader(t *testing.T) {
ctx := context.Background()
ctr, err := container.Run(ctx,
container.WithImage(alpineLatest),
container.WithCmd("sh", "-c", "echo 'abcdefghi' && echo 'foo'"),
container.WithWaitStrategy(wait.ForExit()),
)
container.Cleanup(t, ctr)
require.NoError(t, err)

r, err := ctr.Logs(ctx)
require.NoError(t, err)
defer r.Close()

b, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, "abcdefghi\nfoo", strings.TrimSpace(string(b)))
}

func TestContainer_Logs_shouldStripHeadersFromStderr(t *testing.T) {
ctx := context.Background()
ctr, err := container.Run(ctx,
container.WithImage(alpineLatest),
container.WithCmd("sh", "-c", "echo 'stdout line' && echo 'stderr line' 1>&2"),
container.WithWaitStrategy(wait.ForExit()),
)
container.Cleanup(t, ctr)
require.NoError(t, err)

r, err := ctr.Logs(ctx)
require.NoError(t, err)
defer r.Close()

b, err := io.ReadAll(r)
require.NoError(t, err)

logs := strings.TrimSpace(string(b))

// Both stdout and stderr should be present without stream headers
require.Contains(t, logs, "stdout line")
require.Contains(t, logs, "stderr line")

// Verify no binary stream headers are present in the output
// Stream headers start with 0x01 (stdout) or 0x02 (stderr)
require.NotContains(t, logs, "\x01")
require.NotContains(t, logs, "\x02")
}

func TestContainer_Logs_printOnError(t *testing.T) {
ctx := context.Background()

buf := new(bytes.Buffer)
logger := slog.New(slog.NewTextHandler(buf, nil))

cli, err := client.New(ctx, client.WithLogger(logger))
require.NoError(t, err)

ctr, err := container.Run(ctx,
container.WithDockerClient(cli),
container.WithImage(alpineLatest),
container.WithCmd("echo", "-n", "I am expecting this"),
container.WithWaitStrategy(wait.ForLog("I was expecting that").WithTimeout(5*time.Second)),
)
container.Cleanup(t, ctr)
// it should fail because the waiting for condition is not met
require.Error(t, err)

containerLogs, err := ctr.Logs(ctx)
require.NoError(t, err)
defer containerLogs.Close()

// read container logs line by line, checking that each line is present in the client's logger
rd := bufio.NewReader(containerLogs)
for {
line, err := rd.ReadString('\n')

// Process the line if we have data, even if there's an EOF error
if line != "" {
// the last line of the array should contain the line of interest,
// but we are checking all the lines to make sure that is present
found := false
for _, l := range strings.Split(buf.String(), "\n") {
if strings.Contains(l, line) {
found = true
break
}
}
require.True(t, found, "container log line not found in the output of the logger: %s", line)
}

// Check for errors after processing any data
if errors.Is(err, io.EOF) {
break
}
require.NoErrorf(t, err, "Read Error")
}
}

func TestContainer_Logs_TTYEnabled(t *testing.T) {
ctx := context.Background()
ctr, err := container.Run(ctx,
container.WithImage(alpineLatest),
container.WithCmd("sh", "-c", "echo 'tty output'"),
container.WithConfigModifier(func(cfg *dockercontainer.Config) {
cfg.Tty = true
}),
container.WithWaitStrategy(wait.ForExit()),
)
container.Cleanup(t, ctr)
require.NoError(t, err)

r, err := ctr.Logs(ctx)
require.NoError(t, err)
defer r.Close()

b, err := io.ReadAll(r)
require.NoError(t, err)

logs := strings.TrimSpace(string(b))
require.Contains(t, logs, "tty output")
}