Skip to content
Merged
213 changes: 213 additions & 0 deletions pkg/debuginfo/reader/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package reader

import (
"context"
"fmt"
"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func chunksFunc(chunks [][]byte) func() ([]byte, error) {
i := 0
return func() ([]byte, error) {
if i >= len(chunks) {
return nil, io.EOF
}
chunk := chunks[i]
i++
return chunk, nil
}
}

func TestUploadReader_Read(t *testing.T) {
t.Parallel()

t.Run("single chunk", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), chunksFunc([][]byte{[]byte("hello")}))
data, err := io.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, "hello", string(data))
})

t.Run("multiple chunks", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), chunksFunc([][]byte{
[]byte("abc"),
[]byte("def"),
[]byte("ghi"),
}))
data, err := io.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, "abcdefghi", string(data))
})

t.Run("empty stream", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), chunksFunc(nil))
data, err := io.ReadAll(r)
require.NoError(t, err)
assert.Empty(t, data)
})

t.Run("small buffer", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), chunksFunc([][]byte{[]byte("hello world")}))
buf := make([]byte, 3)
var result []byte
for {
n, err := r.Read(buf)
result = append(result, buf[:n]...)
if err == io.EOF {
break
}
require.NoError(t, err)
}
assert.Equal(t, "hello world", string(result))
})

t.Run("nextFunc error on first call", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), func() ([]byte, error) {
return nil, fmt.Errorf("network error")
})
buf := make([]byte, 1024)
_, err := r.Read(buf)
require.Error(t, err)
assert.Contains(t, err.Error(), "network error")
})

t.Run("error after some chunks", func(t *testing.T) {
t.Parallel()
callCount := 0
r := New(context.Background(), func() ([]byte, error) {
callCount++
if callCount == 1 {
return []byte("abc"), nil
}
return nil, fmt.Errorf("broken")
})

// First read should succeed with data from the first chunk.
buf := make([]byte, 1024)
n, err := r.Read(buf)
require.NoError(t, err)
assert.Equal(t, "abc", string(buf[:n]))

// Next read should get an error when fetching the next chunk.
_, err = r.Read(buf)
require.Error(t, err)
assert.Contains(t, err.Error(), "broken")
})
}

func TestUploadReader_Size(t *testing.T) {
t.Parallel()

t.Run("no data", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), chunksFunc(nil))
_, _ = io.ReadAll(r)
assert.Equal(t, uint64(0), r.Size())
})

t.Run("after reading all", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), chunksFunc([][]byte{
[]byte("hello"),
[]byte("world"),
}))
_, err := io.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, uint64(10), r.Size())
})

t.Run("tracks incrementally", func(t *testing.T) {
t.Parallel()
r := New(context.Background(), chunksFunc([][]byte{
[]byte("aaa"),
[]byte("bbb"),
}))

assert.Equal(t, uint64(0), r.Size())

buf := make([]byte, 1024)
n, err := r.Read(buf)
require.NoError(t, err)
assert.Equal(t, 3, n)
assert.Equal(t, uint64(3), r.Size())
})
}

func TestUploadReader_ContextCancellation(t *testing.T) {
t.Parallel()

t.Run("cancelled before first read", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
cancel()

r := New(ctx, func() ([]byte, error) {
t.Fatal("nextFunc should not be called when context is cancelled")
return nil, nil
})

buf := make([]byte, 1024)
_, err := r.Read(buf)
require.Error(t, err)
})

t.Run("cancelled between chunks", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())

callCount := 0
r := New(ctx, func() ([]byte, error) {
callCount++
if callCount == 1 {
return []byte("first"), nil
}
// Cancel context before returning the second chunk.
cancel()
return []byte("second"), nil
})

// Read the first chunk.
buf := make([]byte, 1024)
n, err := r.Read(buf)
require.NoError(t, err)
assert.Equal(t, "first", string(buf[:n]))

// The second chunk was returned but context was cancelled.
// The reader should detect the cancelled context on the next next() call.
// Read the second chunk (already fetched by internal logic).
n, err = r.Read(buf)
if err != nil {
// Context cancellation detected
return
}
assert.Equal(t, "second", string(buf[:n]))

// Now the third call to next() should detect cancelled context.
_, err = r.Read(buf)
require.Error(t, err)
})
}

func TestUploadReader_ReadAll(t *testing.T) {
t.Parallel()

r := New(context.Background(), chunksFunc([][]byte{
[]byte("The quick "),
[]byte("brown fox "),
[]byte("jumps over the lazy dog"),
}))

data, err := io.ReadAll(r)
require.NoError(t, err)
assert.Equal(t, "The quick brown fox jumps over the lazy dog", string(data))
assert.Equal(t, uint64(len("The quick brown fox jumps over the lazy dog")), r.Size())
}
15 changes: 7 additions & 8 deletions pkg/debuginfo/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/thanos-io/objstore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -121,7 +119,7 @@ func (s *Store) Upload(ctx context.Context, stream *connect.BidiStream[debuginfo
return connect.NewError(connect.CodeInternal, fmt.Errorf("invalid init request: %w", err))
}

l := log.With(s.logger, "gnu_build_id", init.File.GNU)
l := log.With(s.logger, "gnu_build_id", init.File.GnuBuildId)

if !s.cfg.Enabled { // move this to shouldInitiateUpload func

Expand Down Expand Up @@ -168,8 +166,9 @@ func (s *Store) Upload(ctx context.Context, stream *connect.BidiStream[debuginfo

r := debuginforeader.New(ctx, readChunksFromStream(stream))
if err := s.bucket.Upload(ctx, ObjectPath(tenantID, id), r); err != nil {
return status.Error(codes.Internal, fmt.Errorf("upload debuginfo: %w", err).Error())
return connect.NewError(connect.CodeInternal, fmt.Errorf("upload debuginfo: %w", err))
}
md.State = debuginfov1alpha1.ObjectMetadata_STATE_UPLOADED
md.FinishedAt = timestamppb.New(time.Now())
if err := s.writeMetadata(ctx, tenantID, id, md); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to write uploaded metadata: %w", err))
Expand Down Expand Up @@ -207,10 +206,10 @@ func validateInit(init *debuginfov1alpha1.ShouldInitiateUploadRequest) (*ValidGn
return nil, fmt.Errorf("init.FileData == nil")
}
switch init.File.Type {
case debuginfov1alpha1.FileMetadata_DEBUGINFO_TYPE_EXECUTABLE_FULL:
return ValidateGnuBuildID(init.File.GNU)
case debuginfov1alpha1.FileMetadata_DEBUGINFO_TYPE_EXECUTABLE_NO_TEXT:
return ValidateGnuBuildID(init.File.GNU)
case debuginfov1alpha1.FileMetadata_TYPE_EXECUTABLE_FULL:
return ValidateGnuBuildID(init.File.GnuBuildId)
case debuginfov1alpha1.FileMetadata_TYPE_EXECUTABLE_NO_TEXT:
return ValidateGnuBuildID(init.File.GnuBuildId)
default:
return nil, fmt.Errorf("init.FileData.Type(%d) is not valid", init.File.Type)
}
Expand Down
Loading
Loading