From 9753647defcceb0f3d7112283ab6f3c8434f1ada Mon Sep 17 00:00:00 2001 From: izzl Date: Sun, 2 Nov 2025 21:12:59 +1100 Subject: [PATCH] feat: compose object can be parallelised --- api-compose-object.go | 127 ++++++++++++++++++++----- api-compose-object_test.go | 28 +++--- examples/s3/compose-object-parallel.go | 91 ++++++++++++++++++ examples/s3/go.sum | 1 + 4 files changed, 210 insertions(+), 37 deletions(-) create mode 100644 examples/s3/compose-object-parallel.go diff --git a/api-compose-object.go b/api-compose-object.go index 232bd2c01..b8fb90491 100644 --- a/api-compose-object.go +++ b/api-compose-object.go @@ -23,8 +23,10 @@ import ( "io" "net/http" "net/url" + "sort" "strconv" "strings" + "sync" "time" "github.com/google/uuid" @@ -82,6 +84,16 @@ type CopyDestOptions struct { Size int64 // Needs to be specified if progress bar is specified. // Progress of the entire copy operation will be sent here. Progress io.Reader + + // NumThreads sets the number of concurrent part uploads. If not set, + // defaults to 4. Maximum allowed is 100. + NumThreads int + + // PartSize sets the part size for multipart uploads. If not set, + // uses the automatic calculation. Minimum is 5MiB (absMinPartSize). + // This is useful for controlling memory usage and optimizing for + // different network conditions. + PartSize int64 } // Process custom-metadata to remove a `x-amz-meta-` prefix if @@ -170,6 +182,13 @@ func (opts CopyDestOptions) validate() (err error) { if opts.Progress != nil && opts.Size < 0 { return errInvalidArgument("For progress bar effective size needs to be specified") } + // Validate part size if specified + if opts.PartSize > 0 && opts.PartSize < absMinPartSize { + return errInvalidArgument(fmt.Sprintf("PartSize must be at least %d bytes (5 MiB)", absMinPartSize)) + } + if opts.PartSize > maxPartSize { + return errInvalidArgument(fmt.Sprintf("PartSize must not exceed %d bytes (5 GiB)", maxPartSize)) + } return nil } @@ -468,7 +487,7 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs .. srcObjectSizes[i] = srcCopySize // calculate parts needed for current source - totalParts += partsRequired(srcCopySize) + totalParts += partsRequired(srcCopySize, dst.PartSize) // Do we need more parts than we are allowed? if totalParts > maxPartsCount { return UploadInfo{}, errInvalidArgument(fmt.Sprintf( @@ -523,7 +542,32 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs .. } // 3. Perform copy part uploads - objParts := []CompletePart{} + // Determine concurrency level + numThreads := dst.NumThreads + if numThreads <= 0 { + numThreads = 4 // default concurrency + } + if numThreads > 100 { + numThreads = 100 // maximum concurrency + } + + // Count total parts needed + totalPartsNeeded := 0 + for i := range srcs { + startIdx, _ := calculateEvenSplits(srcObjectSizes[i], srcs[i], dst.PartSize) + totalPartsNeeded += len(startIdx) + } + + // Create channel for results and semaphore for concurrency control + type partResult struct { + part CompletePart + err error + } + results := make(chan partResult, totalPartsNeeded) + var wg sync.WaitGroup + sem := make(chan struct{}, numThreads) + + // Launch parallel upload-part-copy operations partIndex := 1 for i, src := range srcs { h := make(http.Header) @@ -532,31 +576,57 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs .. dst.Encryption.Marshal(h) } - // calculate start/end indices of parts after - // splitting. - startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src) + // calculate start/end indices of parts after splitting. + startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src, dst.PartSize) for j, start := range startIdx { end := endIdx[j] - // Add (or reset) source range header for - // upload part copy request. - h.Set("x-amz-copy-source-range", - fmt.Sprintf("bytes=%d-%d", start, end)) + wg.Add(1) + sem <- struct{}{} // acquire semaphore + + go func(idx int, s, e int64, hdr http.Header) { + defer wg.Done() + defer func() { <-sem }() // release semaphore + + // Add (or reset) source range header for upload part copy request. + hdr.Set("x-amz-copy-source-range", + fmt.Sprintf("bytes=%d-%d", s, e)) + + // make upload-part-copy request + complPart, err := c.uploadPartCopy(ctx, dst.Bucket, + dst.Object, uploadID, idx, hdr) + + results <- partResult{part: complPart, err: err} + + if err == nil && dst.Progress != nil { + io.CopyN(io.Discard, dst.Progress, e-s+1) + } + }(partIndex, start, end, h.Clone()) - // make upload-part-copy request - complPart, err := c.uploadPartCopy(ctx, dst.Bucket, - dst.Object, uploadID, partIndex, h) - if err != nil { - return UploadInfo{}, err - } - if dst.Progress != nil { - io.CopyN(io.Discard, dst.Progress, end-start+1) - } - objParts = append(objParts, complPart) partIndex++ } } + // Close results channel when all goroutines complete + go func() { + wg.Wait() + close(results) + }() + + // Collect results + objParts := make([]CompletePart, 0, totalPartsNeeded) + for res := range results { + if res.err != nil { + return UploadInfo{}, res.err + } + objParts = append(objParts, res.part) + } + + // Sort parts by part number to ensure correct order + sort.Slice(objParts, func(i, j int) bool { + return objParts[i].PartNumber < objParts[j].PartNumber + }) + // 4. Make final complete-multipart request. uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID, completeMultipartUpload{Parts: objParts}, PutObjectOptions{ServerSideEncryption: dst.Encryption}) @@ -570,10 +640,17 @@ func (c *Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs .. // partsRequired is maximum parts possible with // max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1)) -func partsRequired(size int64) int64 { - maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1) - r := size / int64(maxPartSize) - if size%int64(maxPartSize) > 0 { +// If customPartSize is specified and valid, it will be used instead. +func partsRequired(size int64, customPartSize int64) int64 { + partSize := int64(maxMultipartPutObjectSize / (maxPartsCount - 1)) + + // Use custom part size if specified and valid + if customPartSize > 0 { + partSize = customPartSize + } + + r := size / partSize + if size%partSize > 0 { r++ } return r @@ -583,12 +660,12 @@ func partsRequired(size int64) int64 { // start and end index slices. Splits happen evenly to be sure that no // part is less than 5MiB, as that could fail the multipart request if // it is not the last part. -func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) { +func calculateEvenSplits(size int64, src CopySrcOptions, customPartSize int64) (startIndex, endIndex []int64) { if size == 0 { return startIndex, endIndex } - reqParts := partsRequired(size) + reqParts := partsRequired(size, customPartSize) startIndex = make([]int64, reqParts) endIndex = make([]int64, reqParts) // Compute number of required parts `k`, as: diff --git a/api-compose-object_test.go b/api-compose-object_test.go index f573057b7..f399d22e1 100644 --- a/api-compose-object_test.go +++ b/api-compose-object_test.go @@ -33,21 +33,25 @@ const ( func TestPartsRequired(t *testing.T) { testCases := []struct { - size, ref int64 + size, customPartSize, ref int64 }{ - {0, 0}, - {1, 1}, - {gb5, 10}, - {gb5p1, 10}, - {2 * gb5, 20}, - {gb10p1, 20}, - {gb10p2, 20}, - {gb10p1 + gb10p2, 40}, - {maxMultipartPutObjectSize, 10000}, + {0, 0, 0}, + {1, 0, 1}, + {gb5, 0, 10}, + {gb5p1, 0, 10}, + {2 * gb5, 0, 20}, + {gb10p1, 0, 20}, + {gb10p2, 0, 20}, + {gb10p1 + gb10p2, 0, 40}, + {maxMultipartPutObjectSize, 0, 10000}, + // Test with custom part size (10 MiB) + {100 * 1024 * 1024, 10 * 1024 * 1024, 10}, + // Test with custom part size (5 MiB) + {50 * 1024 * 1024, 5 * 1024 * 1024, 10}, } for i, testCase := range testCases { - res := partsRequired(testCase.size) + res := partsRequired(testCase.size, testCase.customPartSize) if res != testCase.ref { t.Errorf("Test %d - output did not match with reference results, Expected %d, got %d", i+1, testCase.ref, res) } @@ -143,7 +147,7 @@ func TestCalculateEvenSplits(t *testing.T) { } for i, testCase := range testCases { - resStart, resEnd := calculateEvenSplits(testCase.size, testCase.src) + resStart, resEnd := calculateEvenSplits(testCase.size, testCase.src, 0) if !reflect.DeepEqual(testCase.starts, resStart) || !reflect.DeepEqual(testCase.ends, resEnd) { t.Errorf("Test %d - output did not match with reference results, Expected %d/%d, got %d/%d", i+1, testCase.starts, testCase.ends, resStart, resEnd) } diff --git a/examples/s3/compose-object-parallel.go b/examples/s3/compose-object-parallel.go new file mode 100644 index 000000000..3336f9ea0 --- /dev/null +++ b/examples/s3/compose-object-parallel.go @@ -0,0 +1,91 @@ +//go:build ignore +// +build ignore + +/* + * MinIO Go Library for Amazon S3 Compatible Cloud Storage + * Copyright 2024 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "log" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +func main() { + // Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY, my-bucketname and my-objectname + // are dummy values, please replace them with original values. + + // New returns an Amazon S3 compatible client object. API compatibility (v2 or v4) is automatically + // determined based on the Endpoint value. + s3Client, err := minio.New("s3.amazonaws.com", &minio.Options{ + Creds: credentials.NewStaticV4("YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", ""), + Secure: true, + }) + if err != nil { + log.Fatalln(err) + } + + // Prepare source objects to concatenate. We need to specify information + // about the source objects being concatenated. Since we are using a + // list of copy sources, none of the source objects can be less than + // the minimum part size, except the last one. + srcOpts1 := minio.CopySrcOptions{ + Bucket: "my-bucketname", + Object: "my-objectname-part-1", + } + srcOpts2 := minio.CopySrcOptions{ + Bucket: "my-bucketname", + Object: "my-objectname-part-2", + } + srcOpts3 := minio.CopySrcOptions{ + Bucket: "my-bucketname", + Object: "my-objectname-part-3", + } + + // Prepare destination object. + dstOpts := minio.CopyDestOptions{ + Bucket: "my-bucketname", + Object: "my-objectname-composite", + + // Configure parallel uploads with 10 concurrent threads + NumThreads: 10, + + // Configure custom part size (10 MiB) + // This is useful for controlling memory usage and optimizing for + // different network conditions. If not specified, uses automatic calculation. + PartSize: 10 * 1024 * 1024, // 10 MiB + } + + // Perform the compose operation with parallel uploads + uploadInfo, err := s3Client.ComposeObject(context.Background(), dstOpts, srcOpts1, srcOpts2, srcOpts3) + if err != nil { + log.Fatalln(err) + } + + log.Println("Composed object successfully:") + log.Printf("Bucket: %s\n", uploadInfo.Bucket) + log.Printf("Object: %s\n", uploadInfo.Key) + log.Printf("Size: %d bytes\n", uploadInfo.Size) + log.Printf("ETag: %s\n", uploadInfo.ETag) + + log.Println("\nParallel compose completed with:") + log.Printf("- %d concurrent threads\n", dstOpts.NumThreads) + log.Printf("- Part size: %d bytes\n", dstOpts.PartSize) +} diff --git a/examples/s3/go.sum b/examples/s3/go.sum index 0b4d5ca5b..1f3f6a510 100644 --- a/examples/s3/go.sum +++ b/examples/s3/go.sum @@ -60,6 +60,7 @@ golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=