Skip to content
79 changes: 64 additions & 15 deletions cmd/cat-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"unicode"
"unicode/utf8"

"github.com/dustin/go-humanize"
"github.com/minio/cli"
"github.com/minio/mc/pkg/probe"
)
Expand Down Expand Up @@ -59,6 +60,16 @@ var catFlags = []cli.Flag{
Name: "part-number",
Usage: "download only a specific part number",
},
cli.IntFlag{
Name: "parallel, P",
Usage: "number of parallel downloads (default: 1)",
Value: 1,
},
cli.StringFlag{
Name: "part-size",
Usage: "part size for parallel downloads (e.g. 128MiB, 512MiB)",
Value: "128MiB",
},
}

// Display contents of a file.
Expand Down Expand Up @@ -101,6 +112,12 @@ EXAMPLES:

7. Display the content of a particular object version
{{.Prompt}} {{.HelpName}} --vid "3ddac055-89a7-40fa-8cd3-530a5581b6b8" play/my-bucket/my-object

8. Download a large object with parallel downloads (8 threads, 256MiB parts)
{{.Prompt}} {{.HelpName}} --parallel 8 --part-size 256MiB play/my-bucket/large-file.bin > large-file.bin

9. Stream large object to mc pipe with parallel downloads for fast bucket-to-bucket copy
{{.Prompt}} {{.HelpName}} --parallel 16 --part-size 512MiB source/bucket/15tb-file | mc pipe --part-size 512MiB target/bucket/15tb-file
`,
}

Expand Down Expand Up @@ -161,14 +178,16 @@ func (s prettyStdout) Write(input []byte) (int, error) {
}

type catOpts struct {
args []string
versionID string
timeRef time.Time
startO int64
tailO int64
partN int
isZip bool
stdinMode bool
args []string
versionID string
timeRef time.Time
startO int64
tailO int64
partN int
isZip bool
stdinMode bool
parallel int
partSizeStr string
}

// parseCatSyntax performs command-line input validation for cat command.
Expand Down Expand Up @@ -203,6 +222,9 @@ func parseCatSyntax(ctx *cli.Context) catOpts {
o.startO = ctx.Int64("offset")
o.tailO = ctx.Int64("tail")
o.partN = ctx.Int("part-number")
o.parallel = ctx.Int("parallel")
o.partSizeStr = ctx.String("part-size")

if o.tailO != 0 && o.startO != 0 {
fatalIf(errInvalidArgument().Trace(), "You cannot specify both --tail and --offset")
}
Expand All @@ -218,6 +240,12 @@ func parseCatSyntax(ctx *cli.Context) catOpts {
if (o.tailO != 0 || o.startO != 0) && o.partN > 0 {
fatalIf(errInvalidArgument().Trace(), "You cannot use --part-number with --tail or --offset")
}
if o.parallel > 1 && (o.tailO != 0 || o.startO != 0 || o.partN > 0 || o.isZip) {
fatalIf(errInvalidArgument().Trace(), "You cannot use --parallel with --tail, --offset, --part-number, or --zip")
}
if o.parallel < 1 {
fatalIf(errInvalidArgument().Trace(), "Invalid --parallel value, must be >= 1")
}

return o
}
Expand All @@ -232,13 +260,16 @@ func catURL(ctx context.Context, sourceURL string, encKeyDB map[string][]prefixS
default:
versionID := o.versionID
var err *probe.Error
var client Client
var content *ClientContent

// Try to stat the object, the purpose is to:
// 1. extract the size of S3 object so we can check if the size of the
// downloaded object is equal to the original one. FS files
// are ignored since some of them have zero size though they
// have contents like files under /proc.
// 2. extract the version ID if rewind flag is passed
if client, content, err := url2Stat(ctx, url2StatOptions{
if client, content, err = url2Stat(ctx, url2StatOptions{
urlStr: sourceURL,
versionID: o.versionID,
fileAttr: false,
Expand Down Expand Up @@ -270,12 +301,30 @@ func catURL(ctx context.Context, sourceURL string, encKeyDB map[string][]prefixS
} else {
return err.Trace(sourceURL)
}
gopts := GetOptions{VersionID: versionID, Zip: o.isZip, RangeStart: o.startO, PartNumber: o.partN}
if reader, err = getSourceStreamFromURL(ctx, sourceURL, encKeyDB, getSourceOpts{
GetOptions: gopts,
preserve: false,
}); err != nil {
return err.Trace(sourceURL)

// Use parallel reader for multiple threads
if o.parallel > 1 && size > 0 && client.GetURL().Type == objectStorage {
// Parse part size
partSize, parseErr := humanize.ParseBytes(o.partSizeStr)
if parseErr != nil {
return probe.NewError(parseErr).Trace(sourceURL)
}

gopts := GetOptions{VersionID: versionID, Zip: o.isZip}
pr := NewParallelReader(ctx, client, size, int64(partSize), o.parallel, gopts)
if startErr := pr.Start(); startErr != nil {
return probe.NewError(startErr).Trace(sourceURL)
}
reader = pr
} else {
// Use standard single-threaded reader
gopts := GetOptions{VersionID: versionID, Zip: o.isZip, RangeStart: o.startO, PartNumber: o.partN}
if reader, err = getSourceStreamFromURL(ctx, sourceURL, encKeyDB, getSourceOpts{
GetOptions: gopts,
preserve: false,
}); err != nil {
return err.Trace(sourceURL)
}
}
defer reader.Close()
}
Expand Down
Loading
Loading