Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 102 additions & 25 deletions api-compose-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"io"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -82,6 +84,16 @@ type CopyDestOptions struct {
Size int64 // Needs to be specified if progress bar is specified.
// Progress of the entire copy operation will be sent here.
Progress io.Reader

// NumThreads sets the number of concurrent part uploads. If not set,
// defaults to 4. Maximum allowed is 100.
NumThreads int

// PartSize sets the part size for multipart uploads. If not set,
// uses the automatic calculation. Minimum is 5MiB (absMinPartSize).
// This is useful for controlling memory usage and optimizing for
// different network conditions.
PartSize int64
}

// Process custom-metadata to remove a `x-amz-meta-` prefix if
Expand Down Expand Up @@ -170,6 +182,13 @@ func (opts CopyDestOptions) validate() (err error) {
if opts.Progress != nil && opts.Size < 0 {
return errInvalidArgument("For progress bar effective size needs to be specified")
}
// Validate part size if specified
if opts.PartSize > 0 && opts.PartSize < absMinPartSize {
return errInvalidArgument(fmt.Sprintf("PartSize must be at least %d bytes (5 MiB)", absMinPartSize))
}
if opts.PartSize > maxPartSize {
return errInvalidArgument(fmt.Sprintf("PartSize must not exceed %d bytes (5 GiB)", maxPartSize))
}
return nil
}

Expand Down Expand Up @@ -468,7 +487,7 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ..
srcObjectSizes[i] = srcCopySize

// calculate parts needed for current source
totalParts += partsRequired(srcCopySize)
totalParts += partsRequired(srcCopySize, dst.PartSize)
// Do we need more parts than we are allowed?
if totalParts > maxPartsCount {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf(
Expand Down Expand Up @@ -523,7 +542,32 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ..
}

// 3. Perform copy part uploads
objParts := []CompletePart{}
// Determine concurrency level
numThreads := dst.NumThreads
if numThreads <= 0 {
numThreads = 4 // default concurrency
}
if numThreads > 100 {
numThreads = 100 // maximum concurrency
}

// Count total parts needed
totalPartsNeeded := 0
for i := range srcs {
startIdx, _ := calculateEvenSplits(srcObjectSizes[i], srcs[i], dst.PartSize)
totalPartsNeeded += len(startIdx)
}

// Create channel for results and semaphore for concurrency control
type partResult struct {
part CompletePart
err error
}
results := make(chan partResult, totalPartsNeeded)
var wg sync.WaitGroup
sem := make(chan struct{}, numThreads)

// Launch parallel upload-part-copy operations
partIndex := 1
for i, src := range srcs {
h := make(http.Header)
Expand All @@ -532,31 +576,57 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ..
dst.Encryption.Marshal(h)
}

// calculate start/end indices of parts after
// splitting.
startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src)
// calculate start/end indices of parts after splitting.
startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src, dst.PartSize)
for j, start := range startIdx {
end := endIdx[j]

// Add (or reset) source range header for
// upload part copy request.
h.Set("x-amz-copy-source-range",
fmt.Sprintf("bytes=%d-%d", start, end))
wg.Add(1)
sem <- struct{}{} // acquire semaphore

go func(idx int, s, e int64, hdr http.Header) {
defer wg.Done()
defer func() { <-sem }() // release semaphore

// Add (or reset) source range header for upload part copy request.
hdr.Set("x-amz-copy-source-range",
fmt.Sprintf("bytes=%d-%d", s, e))

// make upload-part-copy request
complPart, err := c.uploadPartCopy(ctx, dst.Bucket,
dst.Object, uploadID, idx, hdr)

results <- partResult{part: complPart, err: err}

if err == nil && dst.Progress != nil {
io.CopyN(io.Discard, dst.Progress, e-s+1)
}
}(partIndex, start, end, h.Clone())

// make upload-part-copy request
complPart, err := c.uploadPartCopy(ctx, dst.Bucket,
dst.Object, uploadID, partIndex, h)
if err != nil {
return UploadInfo{}, err
}
if dst.Progress != nil {
io.CopyN(io.Discard, dst.Progress, end-start+1)
}
objParts = append(objParts, complPart)
partIndex++
}
}

// Close results channel when all goroutines complete
go func() {
wg.Wait()
close(results)
}()

// Collect results
objParts := make([]CompletePart, 0, totalPartsNeeded)
for res := range results {
if res.err != nil {
return UploadInfo{}, res.err
}
objParts = append(objParts, res.part)
}

// Sort parts by part number to ensure correct order
sort.Slice(objParts, func(i, j int) bool {
return objParts[i].PartNumber < objParts[j].PartNumber
})

// 4. Make final complete-multipart request.
uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID,
completeMultipartUpload{Parts: objParts}, PutObjectOptions{ServerSideEncryption: dst.Encryption})
Expand All @@ -570,10 +640,17 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ..

// partsRequired is maximum parts possible with
// max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1))
func partsRequired(size int64) int64 {
maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1)
r := size / int64(maxPartSize)
if size%int64(maxPartSize) > 0 {
// If customPartSize is specified and valid, it will be used instead.
func partsRequired(size int64, customPartSize int64) int64 {
partSize := int64(maxMultipartPutObjectSize / (maxPartsCount - 1))

// Use custom part size if specified and valid
if customPartSize > 0 {
partSize = customPartSize
}

r := size / partSize
if size%partSize > 0 {
r++
}
return r
Expand All @@ -583,12 +660,12 @@ func partsRequired(size int64) int64 {
// start and end index slices. Splits happen evenly to be sure that no
// part is less than 5MiB, as that could fail the multipart request if
// it is not the last part.
func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) {
func calculateEvenSplits(size int64, src CopySrcOptions, customPartSize int64) (startIndex, endIndex []int64) {
if size == 0 {
return startIndex, endIndex
}

reqParts := partsRequired(size)
reqParts := partsRequired(size, customPartSize)
startIndex = make([]int64, reqParts)
endIndex = make([]int64, reqParts)
// Compute number of required parts `k`, as:
Expand Down
28 changes: 16 additions & 12 deletions api-compose-object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,25 @@ const (

func TestPartsRequired(t *testing.T) {
testCases := []struct {
size, ref int64
size, customPartSize, ref int64
}{
{0, 0},
{1, 1},
{gb5, 10},
{gb5p1, 10},
{2 * gb5, 20},
{gb10p1, 20},
{gb10p2, 20},
{gb10p1 + gb10p2, 40},
{maxMultipartPutObjectSize, 10000},
{0, 0, 0},
{1, 0, 1},
{gb5, 0, 10},
{gb5p1, 0, 10},
{2 * gb5, 0, 20},
{gb10p1, 0, 20},
{gb10p2, 0, 20},
{gb10p1 + gb10p2, 0, 40},
{maxMultipartPutObjectSize, 0, 10000},
// Test with custom part size (10 MiB)
{100 * 1024 * 1024, 10 * 1024 * 1024, 10},
// Test with custom part size (5 MiB)
{50 * 1024 * 1024, 5 * 1024 * 1024, 10},
}

for i, testCase := range testCases {
res := partsRequired(testCase.size)
res := partsRequired(testCase.size, testCase.customPartSize)
if res != testCase.ref {
t.Errorf("Test %d - output did not match with reference results, Expected %d, got %d", i+1, testCase.ref, res)
}
Expand Down Expand Up @@ -143,7 +147,7 @@ func TestCalculateEvenSplits(t *testing.T) {
}

for i, testCase := range testCases {
resStart, resEnd := calculateEvenSplits(testCase.size, testCase.src)
resStart, resEnd := calculateEvenSplits(testCase.size, testCase.src, 0)
if !reflect.DeepEqual(testCase.starts, resStart) || !reflect.DeepEqual(testCase.ends, resEnd) {
t.Errorf("Test %d - output did not match with reference results, Expected %d/%d, got %d/%d", i+1, testCase.starts, testCase.ends, resStart, resEnd)
}
Expand Down
91 changes: 91 additions & 0 deletions examples/s3/compose-object-parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//go:build ignore
// +build ignore

/*
* MinIO Go Library for Amazon S3 Compatible Cloud Storage
* Copyright 2024 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"log"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)

func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY, my-bucketname and my-objectname
// are dummy values, please replace them with original values.

// New returns an Amazon S3 compatible client object. API compatibility (v2 or v4) is automatically
// determined based on the Endpoint value.
s3Client, err := minio.New("s3.amazonaws.com", &minio.Options{
Creds: credentials.NewStaticV4("YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", ""),
Secure: true,
})
if err != nil {
log.Fatalln(err)
}

// Prepare source objects to concatenate. We need to specify information
// about the source objects being concatenated. Since we are using a
// list of copy sources, none of the source objects can be less than
// the minimum part size, except the last one.
srcOpts1 := minio.CopySrcOptions{
Bucket: "my-bucketname",
Object: "my-objectname-part-1",
}
srcOpts2 := minio.CopySrcOptions{
Bucket: "my-bucketname",
Object: "my-objectname-part-2",
}
srcOpts3 := minio.CopySrcOptions{
Bucket: "my-bucketname",
Object: "my-objectname-part-3",
}

// Prepare destination object.
dstOpts := minio.CopyDestOptions{
Bucket: "my-bucketname",
Object: "my-objectname-composite",

// Configure parallel uploads with 10 concurrent threads
NumThreads: 10,

// Configure custom part size (10 MiB)
// This is useful for controlling memory usage and optimizing for
// different network conditions. If not specified, uses automatic calculation.
PartSize: 10 * 1024 * 1024, // 10 MiB
}

// Perform the compose operation with parallel uploads
uploadInfo, err := s3Client.ComposeObject(context.Background(), dstOpts, srcOpts1, srcOpts2, srcOpts3)
if err != nil {
log.Fatalln(err)
}

log.Println("Composed object successfully:")
log.Printf("Bucket: %s\n", uploadInfo.Bucket)
log.Printf("Object: %s\n", uploadInfo.Key)
log.Printf("Size: %d bytes\n", uploadInfo.Size)
log.Printf("ETag: %s\n", uploadInfo.ETag)

log.Println("\nParallel compose completed with:")
log.Printf("- %d concurrent threads\n", dstOpts.NumThreads)
log.Printf("- Part size: %d bytes\n", dstOpts.PartSize)
}
1 change: 1 addition & 0 deletions examples/s3/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=