Skip to content

Commit

Permalink
fix: Fixes the response body streaming for GetObject, implementing a …
Browse files Browse the repository at this point in the history
…chunk streamer
  • Loading branch information
niksis02 committed Jan 15, 2025
1 parent cca9fee commit c094086
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 1 deletion.
11 changes: 10 additions & 1 deletion s3api/controllers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,16 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
}

if res.Body != nil {
ctx.Response().SetBodyStream(res.Body, int(getint64(res.ContentLength)))
err := utils.StreamResponseBody(ctx, res.Body)
if err != nil {
SendResponse(ctx, nil,
&MetaOpts{
Logger: c.logger,
MetricsMng: c.mm,
Action: metrics.ActionGetObject,
BucketOwner: parsedAcl.Owner,
})
}
}

return SendResponse(ctx, nil,
Expand Down
24 changes: 24 additions & 0 deletions s3api/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,30 @@ func SetResponseHeaders(ctx *fiber.Ctx, headers []CustomHeader) {
}
}

// Streams the response body by chunks
func StreamResponseBody(ctx *fiber.Ctx, rdr io.ReadCloser) error {
buf := make([]byte, 4096) // 4KB chunks
defer rdr.Close()
for {
n, err := rdr.Read(buf)
if n > 0 {
_, writeErr := ctx.Write(buf[:n])
if writeErr != nil {
return fmt.Errorf("write chunk: %w", writeErr)
}
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}

return fmt.Errorf("read chunk: %w", err)
}
}

return nil
}

func IsValidBucketName(bucket string) bool {
if len(bucket) < 3 || len(bucket) > 63 {
return false
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/group-tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func TestGetObject(s *S3Conf) {
GetObject_invalid_ranges(s)
GetObject_invalid_parent(s)
GetObject_with_meta(s)
GetObject_large_object(s)
GetObject_success(s)
GetObject_directory_success(s)
GetObject_by_range_success(s)
Expand Down Expand Up @@ -753,6 +754,7 @@ func GetIntTests() IntTests {
"GetObject_invalid_ranges": GetObject_invalid_ranges,
"GetObject_invalid_parent": GetObject_invalid_parent,
"GetObject_with_meta": GetObject_with_meta,
"GetObject_large_object": GetObject_large_object,
"GetObject_success": GetObject_success,
"GetObject_directory_success": GetObject_directory_success,
"GetObject_by_range_success": GetObject_by_range_success,
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

var (
shortTimeout = 10 * time.Second
longTimeout = 60 * time.Second
iso8601Format = "20060102T150405Z"
nullVersionId = "null"
)
Expand Down Expand Up @@ -3788,6 +3789,50 @@ func GetObject_with_meta(s *S3Conf) error {
})
}

func GetObject_large_object(s *S3Conf) error {
testName := "GetObject_large_object"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
//FIXME: make the object size larger after
// resolving the context deadline exceeding issue
// in the github actions
dataLength, obj := int64(100*1024*1024), "my-obj"
ctype := defaultContentType

r, err := putObjectWithData(dataLength, &s3.PutObjectInput{
Bucket: &bucket,
Key: &obj,
ContentType: &ctype,
}, s3client)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), longTimeout)
out, err := s3client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &obj,
})
defer cancel()
if err != nil {
return err
}
if *out.ContentLength != dataLength {
return fmt.Errorf("expected content-length %v, instead got %v", dataLength, out.ContentLength)
}

bdy, err := io.ReadAll(out.Body)
if err != nil {
return err
}
defer out.Body.Close()
outCsum := sha256.Sum256(bdy)
if outCsum != r.csum {
return fmt.Errorf("expected the output data checksum to be %v, instead got %v", r.csum, outCsum)
}
return nil
})
}

func GetObject_success(s *S3Conf) error {
testName := "GetObject_success"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
Expand Down

0 comments on commit c094086

Please sign in to comment.