diff --git a/container/container.logs.go b/container/container.logs.go index 5648256e..911a69ea 100644 --- a/container/container.logs.go +++ b/container/container.logs.go @@ -3,6 +3,9 @@ package container import ( "bufio" "context" + "encoding/binary" + "errors" + "fmt" "io" "log/slog" @@ -17,8 +20,6 @@ func (c *Container) Logger() *slog.Logger { // Logs will fetch both STDOUT and STDERR from the current container. Returns a // ReadCloser and leaves it up to the caller to extract what it wants. func (c *Container) Logs(ctx context.Context) (io.ReadCloser, error) { - const streamHeaderSize = 8 - options := container.LogsOptions{ ShowStdout: true, ShowStderr: true, @@ -29,42 +30,76 @@ func (c *Container) Logs(ctx context.Context) (io.ReadCloser, error) { return nil, err } + // Check if the container has TTY enabled, to determine the log format + inspect, err := c.Inspect(ctx) + if err != nil { + rc.Close() + return nil, fmt.Errorf("inspect container: %w", err) + } + + // If TTY is enabled, logs are not multiplexed - return them directly + if inspect.Config.Tty { + return rc, nil + } + + // TTY is disabled, logs are multiplexed with stream headers - parse them + return c.parseMultiplexedLogs(rc), nil +} + +// parseMultiplexedLogs handles the multiplexed log format used when TTY is disabled +func (c *Container) parseMultiplexedLogs(rc io.ReadCloser) io.ReadCloser { + const streamHeaderSize = 8 + pr, pw := io.Pipe() r := bufio.NewReader(rc) go func() { - lineStarted := true - for err == nil { - line, isPrefix, err := r.ReadLine() - - if lineStarted && len(line) >= streamHeaderSize { - line = line[streamHeaderSize:] // trim stream header - lineStarted = false - } - if !isPrefix { - lineStarted = true - } + defer rc.Close() - _, errW := pw.Write(line) - if errW != nil { - return + var closeErr error + defer func() { + if r := recover(); r != nil { + closeErr = fmt.Errorf("panic in log processing: %v", r) } - if !isPrefix { - _, errW := pw.Write([]byte("\n")) - if errW != nil { - return + if closeErr != nil && !errors.Is(closeErr, io.EOF) { + // Real error, close the pipe with the error + if err := pw.CloseWithError(closeErr); err != nil { + c.logger.Debug("failed to close pipe writer with error", "error", err, "original", closeErr) + } + } else { + // No error or EOF, close the pipe normally + if err := pw.Close(); err != nil { + c.logger.Debug("failed to close pipe writer", "error", err) } } + }() + + // Process the Docker multiplexed stream format which includes: + // - Byte 0: Stream type (1 = stdout, 2 = stderr) + // - Bytes 1-3: Reserved + // - Bytes 4-7: Frame size (big-endian uint32) + streamHeader := make([]byte, streamHeaderSize) + + for { + // Read complete stream header - ensures all 8 bytes are read + if _, err := io.ReadFull(r, streamHeader); err != nil { + closeErr = err + break + } + + // Extract frame size from header + frameSize := binary.BigEndian.Uint32(streamHeader[4:]) - if err != nil { - _ = pw.CloseWithError(err) - return + // Copy frame data + if _, err := io.CopyN(pw, r, int64(frameSize)); err != nil { + closeErr = err + break } } }() - return pr, nil + return pr } // printLogs is a helper function that will print the logs of a Docker container diff --git a/container/container.logs_test.go b/container/container.logs_test.go new file mode 100644 index 00000000..8f6d8a6a --- /dev/null +++ b/container/container.logs_test.go @@ -0,0 +1,164 @@ +package container_test + +import ( + "bufio" + "bytes" + "context" + "errors" + "io" + "log/slog" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + dockercontainer "github.com/docker/docker/api/types/container" + "github.com/docker/go-sdk/client" + "github.com/docker/go-sdk/container" + "github.com/docker/go-sdk/container/wait" +) + +func TestContainer_Logs_fromFailedContainer(t *testing.T) { + ctx := context.Background() + c, err := container.Run( + ctx, + container.WithImage(alpineLatest), + container.WithCmd("echo", "-n", "I was not expecting this"), + container.WithWaitStrategy(wait.ForLog("I was expecting this").WithTimeout(5*time.Second)), + ) + + container.Cleanup(t, c) + require.ErrorContains(t, err, "container exited with code 0") + + logs, logErr := c.Logs(ctx) + require.NoError(t, logErr) + + b, err := io.ReadAll(logs) + require.NoError(t, err) + + log := string(b) + require.Contains(t, log, "I was not expecting this") +} + +func TestContainer_Logs_shouldBeWithoutStreamHeader(t *testing.T) { + ctx := context.Background() + ctr, err := container.Run(ctx, + container.WithImage(alpineLatest), + container.WithCmd("sh", "-c", "echo 'abcdefghi' && echo 'foo'"), + container.WithWaitStrategy(wait.ForExit()), + ) + container.Cleanup(t, ctr) + require.NoError(t, err) + + r, err := ctr.Logs(ctx) + require.NoError(t, err) + defer r.Close() + + b, err := io.ReadAll(r) + require.NoError(t, err) + require.Equal(t, "abcdefghi\nfoo", strings.TrimSpace(string(b))) +} + +func TestContainer_Logs_shouldStripHeadersFromStderr(t *testing.T) { + ctx := context.Background() + ctr, err := container.Run(ctx, + container.WithImage(alpineLatest), + container.WithCmd("sh", "-c", "echo 'stdout line' && echo 'stderr line' 1>&2"), + container.WithWaitStrategy(wait.ForExit()), + ) + container.Cleanup(t, ctr) + require.NoError(t, err) + + r, err := ctr.Logs(ctx) + require.NoError(t, err) + defer r.Close() + + b, err := io.ReadAll(r) + require.NoError(t, err) + + logs := strings.TrimSpace(string(b)) + + // Both stdout and stderr should be present without stream headers + require.Contains(t, logs, "stdout line") + require.Contains(t, logs, "stderr line") + + // Verify no binary stream headers are present in the output + // Stream headers start with 0x01 (stdout) or 0x02 (stderr) + require.NotContains(t, logs, "\x01") + require.NotContains(t, logs, "\x02") +} + +func TestContainer_Logs_printOnError(t *testing.T) { + ctx := context.Background() + + buf := new(bytes.Buffer) + logger := slog.New(slog.NewTextHandler(buf, nil)) + + cli, err := client.New(ctx, client.WithLogger(logger)) + require.NoError(t, err) + + ctr, err := container.Run(ctx, + container.WithDockerClient(cli), + container.WithImage(alpineLatest), + container.WithCmd("echo", "-n", "I am expecting this"), + container.WithWaitStrategy(wait.ForLog("I was expecting that").WithTimeout(5*time.Second)), + ) + container.Cleanup(t, ctr) + // it should fail because the waiting for condition is not met + require.Error(t, err) + + containerLogs, err := ctr.Logs(ctx) + require.NoError(t, err) + defer containerLogs.Close() + + // read container logs line by line, checking that each line is present in the client's logger + rd := bufio.NewReader(containerLogs) + for { + line, err := rd.ReadString('\n') + + // Process the line if we have data, even if there's an EOF error + if line != "" { + // the last line of the array should contain the line of interest, + // but we are checking all the lines to make sure that is present + found := false + for _, l := range strings.Split(buf.String(), "\n") { + if strings.Contains(l, line) { + found = true + break + } + } + require.True(t, found, "container log line not found in the output of the logger: %s", line) + } + + // Check for errors after processing any data + if errors.Is(err, io.EOF) { + break + } + require.NoErrorf(t, err, "Read Error") + } +} + +func TestContainer_Logs_TTYEnabled(t *testing.T) { + ctx := context.Background() + ctr, err := container.Run(ctx, + container.WithImage(alpineLatest), + container.WithCmd("sh", "-c", "echo 'tty output'"), + container.WithConfigModifier(func(cfg *dockercontainer.Config) { + cfg.Tty = true + }), + container.WithWaitStrategy(wait.ForExit()), + ) + container.Cleanup(t, ctr) + require.NoError(t, err) + + r, err := ctr.Logs(ctx) + require.NoError(t, err) + defer r.Close() + + b, err := io.ReadAll(r) + require.NoError(t, err) + + logs := strings.TrimSpace(string(b)) + require.Contains(t, logs, "tty output") +}