Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,11 @@ func (c *S3Client) Copy(ctx context.Context, source string, opts CopyOptions, pr
destOpts.ReplaceMetadata = len(metadata) > 0

var e error
if opts.disableMultipart || opts.size < 64*1024*1024 {
// Use CopyObject for files < 5GiB (its maximum size limit)
// Use ComposeObject for files >= 5GiB (supports multipart copy up to 5TiB)
const maxCopyObjectSize = 5 * 1024 * 1024 * 1024 // 5GiB

if opts.disableMultipart || opts.size < maxCopyObjectSize {
_, e = c.api.CopyObject(ctx, destOpts, srcOpts)
} else {
_, e = c.api.ComposeObject(ctx, destOpts, srcOpts)
Expand Down
43 changes: 28 additions & 15 deletions cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"io"
"maps"
"net/http"
"os"
"path/filepath"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/pkg/v3/console"
"github.com/minio/pkg/v3/env"
)

Expand Down Expand Up @@ -290,7 +292,7 @@ func getAllMetadata(ctx context.Context, sourceAlias, sourceURLStr string, srcSS
}

// uploadSourceToTargetURL - uploads to targetURL from source.
// optionally optimizes copy for object sizes <= 5GiB by using
// optionally optimizes copy for object sizes <= 5TiB by using
// server side copy operation.
func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTargetURLOpts) URLs {
sourceAlias := uploadOpts.urls.SourceAlias
Expand Down Expand Up @@ -353,8 +355,16 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
metadata[http.CanonicalHeaderKey(k)] = v
}

// Optimize for server side copy if the host is same.
if sourceAlias == targetAlias && !uploadOpts.isZip && !uploadOpts.urls.checksum.IsSet() {
// Server-side copy using ComposeObject API has a 5TiB limit
// For files >= 5TiB, we must use stream copy (download + upload) even for same-alias
const maxServerSideCopySize = 5 * 1024 * 1024 * 1024 * 1024 // 5 TiB
canUseServerSideCopy := sourceAlias == targetAlias &&
!uploadOpts.isZip &&
!uploadOpts.urls.checksum.IsSet() &&
length < maxServerSideCopySize

// Optimize for server side copy if the host is same and file size allows it
if canUseServerSideCopy {
// preserve new metadata and save existing ones.
if uploadOpts.preserve {
currentMetadata, err := getAllMetadata(ctx, sourceAlias, sourceURL.String(), srcSSE, uploadOpts.urls)
Expand All @@ -377,10 +387,6 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
}

sourcePath := filepath.ToSlash(sourceURL.Path)
if uploadOpts.urls.SourceContent.RetentionEnabled {
err = putTargetRetention(ctx, targetAlias, targetURL.String(), metadata)
return uploadOpts.urls.WithError(err.Trace(sourceURL.String()))
}

opts := CopyOptions{
srcSSE: srcSSE,
Expand All @@ -393,6 +399,11 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge

err = copySourceToTargetURL(ctx, targetAlias, targetURL.String(), sourcePath, sourceVersion, mode, until,
legalHold, length, uploadOpts.progress, opts)

// Can apply retention after copy if enabled
if err == nil && uploadOpts.urls.SourceContent.RetentionEnabled {
err = putTargetRetention(ctx, targetAlias, targetURL.String(), metadata)
}
} else {
if uploadOpts.urls.SourceContent.RetentionEnabled {
// preserve new metadata and save existing ones.
Expand Down Expand Up @@ -447,9 +458,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
}

metadata := make(map[string]string, len(content.Metadata))
for k, v := range content.Metadata {
metadata[k] = v
}
maps.Copy(metadata, content.Metadata)

// Get metadata from target content as well
for k, v := range uploadOpts.urls.TargetContent.Metadata {
Expand Down Expand Up @@ -486,13 +495,17 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
}
}

if uploadOpts.multipartThreads == "" {
if uploadOpts.multipartThreads == 0 {
multipartThreads, e = strconv.Atoi(env.Get("MC_UPLOAD_MULTIPART_THREADS", "4"))
if e != nil {
return uploadOpts.urls.WithError(probe.NewError(e))
}
} else {
multipartThreads, e = strconv.Atoi(uploadOpts.multipartThreads)
multipartThreads = uploadOpts.multipartThreads
}
if e != nil {
return uploadOpts.urls.WithError(probe.NewError(e))

if globalDebug {
console.Debugln("DEBUG: multipart configuration - part-size:", humanize.IBytes(multipartSize), "parallel:", multipartThreads, "file size:", humanize.IBytes(uint64(length)))
}

putOpts := PutOptions{
Expand Down Expand Up @@ -586,7 +599,7 @@ type uploadSourceToTargetURLOpts struct {
encKeyDB map[string][]prefixSSEPair
preserve, isZip bool
multipartSize string
multipartThreads string
multipartThreads int
updateProgressTotal bool
ifNotExists bool
}
99 changes: 91 additions & 8 deletions cmd/cp-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"strings"

"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/minio/cli"
json "github.com/minio/colorjson"
Expand Down Expand Up @@ -72,6 +73,14 @@ var (
Name: "disable-multipart",
Usage: "disable multipart upload feature",
},
cli.StringFlag{
Name: "part-size",
Usage: "part size for multipart uploads (e.g. 16MiB, 64MiB, 128MiB). Max 5GiB. Max file size = part-size × 10000",
},
cli.IntFlag{
Name: "parallel",
Usage: "number of parts to upload in parallel for multipart uploads",
},
cli.BoolFlag{
Name: "md5",
Usage: "force all upload(s) to calculate md5sum checksum",
Expand Down Expand Up @@ -194,6 +203,12 @@ EXAMPLES:
19. Set tags to the uploaded objects
{{.Prompt}} {{.HelpName}} -r --tags "category=prod&type=backup" ./data/ play/another-bucket/

20. Copy a large file with custom part size and parallel uploads
{{.Prompt}} {{.HelpName}} --part-size 128MiB --parallel 8 largefile.bin play/mybucket/

21. Copy a very large file (12TB+) with CRC32C checksum and optimized multipart settings
{{.Prompt}} {{.HelpName}} --checksum CRC32C --part-size 5GiB --parallel 8 verylargefile.bin play/mybucket/

`,
}

Expand Down Expand Up @@ -306,6 +321,37 @@ func printCopyURLsError(cpURLs *URLs) {
}
}

// doCopySession manages the copy session and determines copy strategy.
//
// 1. SERVER-SIDE COPY - no data transfer through client and is preferred
// Used when all below conditions are met:
// - Source and target are on the same alias (same MinIO/S3 server)
// - File size is < 5 TiB (ComposeObject API limit)
// - Not extracting from zip (--zip not used)
// - No checksum verification requested (--checksum not used)
//
// Multipart behavior: Uses ComposeObject API with X-Amz-Copy-Source-Range headers.
// Part size and parallel settings ARE applied via --part-size and --parallel flags.
//
// 2. STREAM COPY (download + upload through client)
// Used when ANY of these conditions are met:
// - Source and target are on different aliases (cross-server copy)
// - File size is >= 5 TiB
// - Extracting from zip (--zip flag used)
// - Checksum verification requested (--checksum flag used)
//
// Multipart behavior: Uses standard multipart upload API.
// Part size and parallel settings ARE applied via --part-size and --parallel flags.
//
// 3. PUT without multipart
// Used when:
// - File size is < 64 MiB (default threshold)
// - OR --disable-multipart flag is used
//
// Notes:
// - The 5 TiB limit is a limitation of the S3 ComposeObject API
// - Default part size: based on Minio SDK of 128 MiB for multipart uploads
// - Default parallel: 4 threads if parallel is not set and no env var MC_UPLOAD_MULTIPART_THREADS
func doCopySession(ctx context.Context, cancelCopy context.CancelFunc, cli *cli.Context, encryptionKeys map[string][]prefixSSEPair, isMvCmd bool) error {
var isCopied func(string) bool
var totalObjects, totalBytes int64
Expand Down Expand Up @@ -441,17 +487,54 @@ func doCopySession(ctx context.Context, cancelCopy context.CancelFunc, cli *cli.
return doCopyFake(cpURLs, pg)
}, 0)
} else {
const maxServerSideCopySize = 5 * 1024 * 1024 * 1024 * 1024 // 5 TiB
isServerSideCopy := cpURLs.SourceAlias == cpURLs.TargetAlias &&
!isZip &&
!checksum.IsSet() &&
cpURLs.SourceContent.Size < maxServerSideCopySize

// For server-side copy (< 5TiB), pass size=0 to parallel manager
// For stream copy, pass actual size for progress tracking
queueSize := cpURLs.SourceContent.Size
if isServerSideCopy {
queueSize = 0 // No client bandwidth used for server-side copy
}

if globalDebug {
copyType := "server-side copy"
if !isServerSideCopy {
copyType = "stream copy"
if checksum.IsSet() {
console.Debugln(fmt.Sprintf("DEBUG: Checksum %v requested - forcing stream copy for verification", checksum))
}
}

partSizeStr := cli.String("part-size")
if partSizeStr == "" {
partSizeStr = "default"
}

console.Debugln(fmt.Sprintf("DEBUG: Starting %s - file: %s, size: %s, part-size: %s, parallel: %d",
copyType,
cpURLs.SourceContent.URL.Path,
humanize.IBytes(uint64(cpURLs.SourceContent.Size)),
partSizeStr,
cli.Int("parallel")))
}

// Print the copy resume summary once in start
parallel.queueTask(func() URLs {
return doCopy(ctx, doCopyOpts{
cpURLs: cpURLs,
pg: pg,
encryptionKeys: encryptionKeys,
isMvCmd: isMvCmd,
preserve: preserve,
isZip: isZip,
cpURLs: cpURLs,
pg: pg,
encryptionKeys: encryptionKeys,
isMvCmd: isMvCmd,
preserve: preserve,
isZip: isZip,
multipartSize: cli.String("part-size"),
multipartThreads: cli.Int("parallel"),
})
}, cpURLs.SourceContent.Size)
}, queueSize)
}
}
}
Expand Down Expand Up @@ -569,6 +652,6 @@ type doCopyOpts struct {
isMvCmd, preserve, isZip bool
updateProgressTotal bool
multipartSize string
multipartThreads string
multipartThreads int
ifNotExists bool
}
Loading