Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify very large block size to force put blob instead of stage block/put block list #2561

Merged
merged 9 commits into from
Feb 13, 2024
15 changes: 9 additions & 6 deletions cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ type rawBenchmarkCmdArgs struct {
numOfFolders uint

// options from flags
blockSizeMB float64
putMd5 bool
checkLength bool
blobType string
output string
mode string
blockSizeMB float64
putBlobSizeMB float64
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
putMd5 bool
checkLength bool
blobType string
output string
mode string
}

const (
Expand Down Expand Up @@ -147,6 +148,7 @@ func (raw rawBenchmarkCmdArgs) cook() (CookedCopyCmdArgs, error) {
c.forceWrite = common.EOverwriteOption.True().String() // don't want the extra round trip (for overwrite check) when benchmarking

c.blockSizeMB = raw.blockSizeMB
c.putBlobSizeMB = raw.putBlobSizeMB
c.putMd5 = raw.putMd5
c.CheckLength = raw.checkLength
c.blobType = raw.blobType
Expand Down Expand Up @@ -337,6 +339,7 @@ func init() {
benchCmd.PersistentFlags().BoolVar(&raw.deleteTestData, "delete-test-data", true, "if true, the benchmark data will be deleted at the end of the benchmark run. Set it to false if you want to keep the data at the destination - e.g. to use it for manual tests outside benchmark mode")

benchCmd.PersistentFlags().Float64Var(&raw.blockSizeMB, "block-size-mb", 0, "use this block size (specified in MiB). Default is automatically calculated based on file size. Decimal fractions are allowed - e.g. 0.25. Identical to the same-named parameter in the copy command")
benchCmd.PersistentFlags().Float64Var(&raw.putBlobSizeMB, "put-blob-size-mb", 0, "Use this size (specified in MiB) as a threshold to determine whether to upload a blob as a single PUT request when uploading to Azure Storage. The default value is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25).")
benchCmd.PersistentFlags().StringVar(&raw.blobType, "blob-type", "Detect", "defines the type of blob at the destination. Used to allow benchmarking different blob types. Identical to the same-named parameter in the copy command")
benchCmd.PersistentFlags().BoolVar(&raw.putMd5, "put-md5", false, "create an MD5 hash of each file, and save the hash as the Content-MD5 property of the destination blob/file. (By default the hash is NOT created.) Identical to the same-named parameter in the copy command")
benchCmd.PersistentFlags().BoolVar(&raw.checkLength, "check-length", true, "Check the length of a file on the destination after the transfer. If there is a mismatch between source and destination, the transfer is marked as failed.")
Expand Down
11 changes: 10 additions & 1 deletion cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type rawCopyCmdArgs struct {

// options from flags
blockSizeMB float64
putBlobSizeMB float64
metadata string
contentType string
contentEncoding string
Expand Down Expand Up @@ -354,6 +355,11 @@ func (raw rawCopyCmdArgs) cook() (CookedCopyCmdArgs, error) {
return cooked, err
}

cooked.putBlobSize, err = blockSizeInBytes(raw.putBlobSizeMB)
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return cooked, err
}

// parse the given blob type.
err = cooked.blobType.Parse(raw.blobType)
if err != nil {
Expand Down Expand Up @@ -1123,7 +1129,8 @@ type CookedCopyCmdArgs struct {
autoDecompress bool

// options from flags
blockSize int64
blockSize int64
putBlobSize int64
// list of blobTypes to exclude while enumerating the transfer
excludeBlobType []blob.BlobType
blobType common.BlobType
Expand Down Expand Up @@ -1508,6 +1515,7 @@ func (cca *CookedCopyCmdArgs) processCopyJobPartOrders() (err error) {
BlobAttributes: common.BlobTransferAttributes{
BlobType: cca.blobType,
BlockSizeInBytes: cca.blockSize,
PutBlobSizeInBytes: cca.putBlobSize,
ContentType: cca.contentType,
ContentEncoding: cca.contentEncoding,
ContentLanguage: cca.contentLanguage,
Expand Down Expand Up @@ -2049,6 +2057,7 @@ func init() {
// options change how the transfers are performed
cpCmd.PersistentFlags().Float64Var(&raw.blockSizeMB, "block-size-mb", 0, "Use this block size (specified in MiB) when uploading to Azure Storage, and downloading from Azure Storage. The default value is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25)."+
" When uploading or downloading, maximum allowed block size is 0.75 * AZCOPY_BUFFER_GB. Please refer https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-optimize#optimize-memory-use.")
cpCmd.PersistentFlags().Float64Var(&raw.putBlobSizeMB, "put-blob-size-mb", 0, "Use this size (specified in MiB) as a threshold to determine whether to upload a blob as a single PUT request when uploading to Azure Storage. The default value is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25).")
cpCmd.PersistentFlags().StringVar(&raw.blobType, "blob-type", "Detect", "Defines the type of blob at the destination. This is used for uploading blobs and when copying between accounts (default 'Detect'). Valid values include 'Detect', 'BlockBlob', 'PageBlob', and 'AppendBlob'. "+
"When copying between accounts, a value of 'Detect' causes AzCopy to use the type of source blob to determine the type of the destination blob. When uploading a file, 'Detect' determines if the file is a VHD or a VHDX file based on the file extension. If the file is either a VHD or VHDX file, AzCopy treats the file as a page blob.")
cpCmd.PersistentFlags().StringVar(&raw.blockBlobTier, "block-blob-tier", "None", "upload block blob to Azure Storage using this blob tier. (default 'None'). Valid options are Hot, Cold, Cool, Archive")
Expand Down
7 changes: 7 additions & 0 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type rawSyncCmdArgs struct {

// options from flags
blockSizeMB float64
putBlobSizeMB float64
include string
exclude string
excludePath string
Expand Down Expand Up @@ -199,6 +200,10 @@ func (raw *rawSyncCmdArgs) cook() (cookedSyncCmdArgs, error) {
if err != nil {
return cooked, err
}
cooked.putBlobSize, err = blockSizeInBytes(raw.putBlobSizeMB)
if err != nil {
return cooked, err
}

if err = cooked.symlinkHandling.Determine(raw.followSymlinks, raw.preserveSymlinks); err != nil {
return cooked, err
Expand Down Expand Up @@ -396,6 +401,7 @@ type cookedSyncCmdArgs struct {
putMd5 bool
md5ValidationOption common.HashValidationOption
blockSize int64
putBlobSize int64
forceIfReadOnly bool
backupMode bool

Expand Down Expand Up @@ -782,6 +788,7 @@ func init() {
// syncCmd.PersistentFlags().BoolVar(&raw.backupMode, common.BackupModeFlagName, false, "Activates Windows' SeBackupPrivilege for uploads, or SeRestorePrivilege for downloads, to allow AzCopy to see read all files, regardless of their file system permissions, and to restore all permissions. Requires that the account running AzCopy already has these permissions (e.g. has Administrator rights or is a member of the 'Backup Operators' group). All this flag does is activate privileges that the account already has")

syncCmd.PersistentFlags().Float64Var(&raw.blockSizeMB, "block-size-mb", 0, "Use this block size (specified in MiB) when uploading to Azure Storage or downloading from Azure Storage. Default is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25).")
syncCmd.PersistentFlags().Float64Var(&raw.putBlobSizeMB, "put-blob-size-mb", 0, "Use this size (specified in MiB) as a threshold to determine whether to upload a blob as a single PUT request when uploading to Azure Storage. The default value is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25).")
syncCmd.PersistentFlags().StringVar(&raw.include, "include-pattern", "", "Include only files where the name matches the pattern list. For example: *.jpg;*.pdf;exactName")
syncCmd.PersistentFlags().StringVar(&raw.exclude, "exclude-pattern", "", "Exclude files where the name matches the pattern list. For example: *.jpg;*.pdf;exactName")
syncCmd.PersistentFlags().StringVar(&raw.excludePath, "exclude-path", "", "Exclude these paths when comparing the source against the destination. "+
Expand Down
6 changes: 3 additions & 3 deletions cmd/syncEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
PutMd5: cca.putMd5,
MD5ValidationOption: cca.md5ValidationOption,
BlockSizeInBytes: cca.blockSize,
PutBlobSizeInBytes: cca.putBlobSize,
DeleteDestinationFileIfNecessary: cca.deleteDestinationFileIfNecessary,
},
ForceWrite: common.EOverwriteOption.True(), // once we decide to transfer for a sync operation, we overwrite the destination regardless
Expand All @@ -177,9 +178,8 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
},
}


options := createClientOptions(common.AzcopyCurrentJobLogger, nil)

// Create Source Client.
var azureFileSpecificOptions any
if cca.fromTo.From() == common.ELocation.File() {
Expand Down Expand Up @@ -213,7 +213,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
if cca.fromTo.IsS2S() && srcCredInfo.CredentialType.IsAzureOAuth() {
srcTokenCred = common.NewScopedCredential(srcCredInfo.OAuthTokenInfo.TokenCredential, srcCredInfo.CredentialType)
}

options = createClientOptions(common.AzcopyCurrentJobLogger, srcTokenCred)
dstURL, _ := cca.destination.String()
copyJobTemplate.DstServiceClient, err = common.GetServiceClientForLocation(
Expand Down
2 changes: 2 additions & 0 deletions common/fe-ste-models.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,8 @@ func (i *InvalidMetadataHandleOption) UnmarshalJSON(b []byte) error {
const (
DefaultBlockBlobBlockSize = 8 * 1024 * 1024
MaxBlockBlobBlockSize = 4000 * 1024 * 1024
MaxPutBlobSize = 5000 * 1024 * 1024
DefaultPutBlobSize = 256 * 1024 * 1024
MaxAppendBlobBlockSize = 100 * 1024 * 1024
DefaultPageBlobChunkSize = 4 * 1024 * 1024
DefaultAzureFileChunkSize = 4 * 1024 * 1024
Expand Down
9 changes: 5 additions & 4 deletions common/rpc-models.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ type CopyJobPartOrderRequest struct {
// CredentialInfo contains essential credential info which need be transited between modules,
// and used during creating Azure storage client Credential.
type CredentialInfo struct {
CredentialType CredentialType
OAuthTokenInfo OAuthTokenInfo
S3CredentialInfo S3CredentialInfo
GCPCredentialInfo GCPCredentialInfo
CredentialType CredentialType
OAuthTokenInfo OAuthTokenInfo
S3CredentialInfo S3CredentialInfo
GCPCredentialInfo GCPCredentialInfo
}

func (c CredentialInfo) WithType(credentialType CredentialType) CredentialInfo {
Expand Down Expand Up @@ -238,6 +238,7 @@ type BlobTransferAttributes struct {
PutMd5 bool // when uploading, should we create and PUT Content-MD5 hashes
MD5ValidationOption HashValidationOption // when downloading, how strictly should we validate MD5 hashes?
BlockSizeInBytes int64 // when uploading/downloading/copying, specify the size of each chunk
PutBlobSizeInBytes int64 // when uploading, specify the threshold to determine if the blob should be uploaded in a single PUT request
DeleteSnapshotsOption DeleteSnapshotsOption // when deleting, specify what to do with the snapshots
BlobTagsString string // when user explicitly provides blob tags
PermanentDeleteOption PermanentDeleteOption // Permanently deletes soft-deleted snapshots when indicated by user
Expand Down
7 changes: 4 additions & 3 deletions e2etest/declarativeHelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type params struct {
forceIfReadOnly bool
capMbps float32
blockSizeMB float32
putBlobSizeMB float32
deleteDestination common.DeleteDestination // Manual validation is needed.
s2sSourceChangeValidation bool
metadata string
Expand Down Expand Up @@ -179,9 +180,9 @@ type params struct {
destNull bool

disableParallelTesting bool
deleteDestinationFile bool
trailingDot common.TrailingDotOption
decompress bool
deleteDestinationFile bool
trailingDot common.TrailingDotOption
decompress bool
// looks like this for a folder transfer:
/*
INFO: source: /New folder/New Text Document.txt dest: /Test/New folder/New Text Document.txt
Expand Down
1 change: 1 addition & 0 deletions e2etest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (t *TestRunner) SetAllFlags(s *scenario) {
set("exclude-pattern", p.excludePattern, "")
set("cap-mbps", p.capMbps, float32(0))
set("block-size-mb", p.blockSizeMB, float32(0))
set("put-blob-size-mb", p.putBlobSizeMB, float32(0))
set("s2s-detect-source-changed", p.s2sSourceChangeValidation, false)
set("metadata", p.metadata, "")
set("cancel-from-stdin", p.cancelFromStdin, false)
Expand Down
71 changes: 71 additions & 0 deletions e2etest/zt_basic_copy_sync_remove_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,3 +1149,74 @@ func TestCopySync_DeleteDestinationFileFlag(t *testing.T) {
"",
)
}

func TestBasic_PutBlobSizeSingleShot(t *testing.T) {
RunScenarios(t, eOperation.CopyAndSync(), eTestFromTo.Other(common.EFromTo.LocalBlob(), common.EFromTo.BlobBlob()), eValidate.Auto(), anonymousAuthOnly, anonymousAuthOnly, params{
recursive: true,
putBlobSizeMB: 256, // 256 MB
}, &hooks{
afterValidation: func(h hookHelper) {
props := h.GetDestination().getAllProperties(h.GetAsserter())
h.GetAsserter().Assert(len(props), equals(), 1)
for key, _ := range props {
// we try to match the test.txt substring because local test files have randomizing prefix to file names
if strings.Contains(key, "test.txt") {
client := h.GetDestination().(*resourceBlobContainer).containerClient.NewBlockBlobClient(key)
list, err := client.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
if err != nil {
t.Errorf("error getting block list %s", err)
}
if len(list.CommittedBlocks) != 0 {
t.Errorf("expected 0 committed blocks, got %d", len(list.CommittedBlocks))
}
if len(list.UncommittedBlocks) != 0 {
t.Errorf("expected 0 uncommitted blocks, got %d", len(list.UncommittedBlocks))
}
}
}
},
}, testFiles{
defaultSize: "101M",

shouldTransfer: []interface{}{
folder(""),
f("test.txt"),
},
}, EAccountType.Standard(), EAccountType.Standard(), "")
}

func TestBasic_PutBlobSizeMultiPart(t *testing.T) {
RunScenarios(t, eOperation.CopyAndSync(), eTestFromTo.Other(common.EFromTo.LocalBlob(), common.EFromTo.BlobBlob()), eValidate.Auto(), anonymousAuthOnly, anonymousAuthOnly, params{
recursive: true,
putBlobSizeMB: 50, // 256 MB
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
}, &hooks{
afterValidation: func(h hookHelper) {
props := h.GetDestination().getAllProperties(h.GetAsserter())
h.GetAsserter().Assert(len(props), equals(), 1)
for key, _ := range props {
// we try to match the test.txt substring because local test files have randomizing prefix to file names
if strings.Contains(key, "test.txt") {
client := h.GetDestination().(*resourceBlobContainer).containerClient.NewBlockBlobClient(key)
list, err := client.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
if err != nil {
t.Errorf("error getting block list %s", err)
}
// default block size is 8mb
if len(list.CommittedBlocks) != 13 {
t.Errorf("expected 13 committed blocks, got %d", len(list.CommittedBlocks))
}
if len(list.UncommittedBlocks) != 0 {
t.Errorf("expected 0 uncommitted blocks, got %d", len(list.UncommittedBlocks))
}
}
}
},
}, testFiles{
defaultSize: "101M",

shouldTransfer: []interface{}{
folder(""),
f("test.txt"),
},
}, EAccountType.Standard(), EAccountType.Standard(), "")
}
4 changes: 4 additions & 0 deletions ste/JobPartPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type IJobPartPlanHeader interface {
TransferSrcDstStrings(transferIndex uint32) (source string, destination string, isFolder bool)
TransferSrcPropertiesAndMetadata(transferIndex uint32) (h common.ResourceHTTPHeaders, metadata common.Metadata, blobType blob.BlobType, blobTier blob.AccessTier, s2sGetPropertiesInBackend bool, DestLengthValidation bool, s2sSourceChangeValidation bool, s2sInvalidMetadataHandleOption common.InvalidMetadataHandleOption, entityType common.EntityType, blobVersionID string, blobSnapshotID string, blobTags common.BlobTags)
}

// JobPartPlanHeader represents the header of Job Part's memory-mapped file
type JobPartPlanHeader struct {
// Once set, the following fields are constants; they should never be modified
Expand Down Expand Up @@ -351,6 +352,9 @@ type JobPartPlanDstBlob struct {
// Specifies the maximum size of block which determines the number of chunks and chunk size of a transfer
BlockSize int64

// Specifies the maximum size of a blob which can be uploaded by a single PUT request.
PutBlobSize int64

SetPropertiesFlags common.SetPropertiesFlags

DeleteDestinationFileIfNecessary bool
Expand Down
2 changes: 2 additions & 0 deletions ste/JobPartPlanFileName.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (jpfn JobPartPlanFileName) Create(order common.CopyJobPartOrderRequest) {
// panic(errors.New("unrecognized blob type"))
// }*/
// }
putBlobSize := order.BlobAttributes.PutBlobSizeInBytes
// Initialize the Job Part's Plan header
jpph := JobPartPlanHeader{
Version: DataSchemaVersion,
Expand Down Expand Up @@ -198,6 +199,7 @@ func (jpfn JobPartPlanFileName) Create(order common.CopyJobPartOrderRequest) {
PageBlobTier: order.BlobAttributes.PageBlobTier,
MetadataLength: uint16(len(order.BlobAttributes.Metadata)),
BlockSize: blockSize,
PutBlobSize: putBlobSize,
BlobTagsLength: uint16(len(order.BlobAttributes.BlobTagsString)),
CpkInfo: order.CpkOptions.CpkInfo,
CpkScopeInfoLength: uint16(len(order.CpkOptions.CpkScopeInfo)),
Expand Down
9 changes: 9 additions & 0 deletions ste/mgr-JobPartTransferMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type IJobPartTransferMgr interface {
type TransferInfo struct {
JobID common.JobID
BlockSize int64
PutBlobSize int64
Source string
SourceSize int64
Destination string
Expand Down Expand Up @@ -382,6 +383,13 @@ func (jptm *jobPartTransferMgr) Info() *TransferInfo {
}
blockSize = common.Iff(blockSize > common.MaxBlockBlobBlockSize, common.MaxBlockBlobBlockSize, blockSize)

// If the putBlobSize is 0, then the user didn't provide any putBlobSize, default to DefaultPutBlobSize
putBlobSize := dstBlobData.PutBlobSize
if putBlobSize == 0 {
putBlobSize = blockSize
}
putBlobSize = common.Iff(putBlobSize > common.MaxPutBlobSize, common.MaxPutBlobSize, putBlobSize)
adreed-msft marked this conversation as resolved.
Show resolved Hide resolved

var srcBlobTags common.BlobTags
if blobTags != nil {
srcBlobTags = common.BlobTags{}
Expand All @@ -395,6 +403,7 @@ func (jptm *jobPartTransferMgr) Info() *TransferInfo {
return &TransferInfo{
JobID: plan.JobID,
BlockSize: blockSize,
PutBlobSize: putBlobSize,
Source: srcURI,
SourceSize: sourceSize,
Destination: dstURI,
Expand Down
4 changes: 2 additions & 2 deletions ste/s2sCopier-URLToBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newURLToBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pace

switch targetBlobType {
case blob.BlobTypeBlockBlob:
return newURLToBlockBlobCopier(jptm, destination, pacer, srcInfoProvider)
return newURLToBlockBlobCopier(jptm, pacer, srcInfoProvider)
case blob.BlobTypeAppendBlob:
return newURLToAppendBlobCopier(jptm, destination, pacer, srcInfoProvider)
case blob.BlobTypePageBlob:
Expand All @@ -123,6 +123,6 @@ func newURLToBlobCopier(jptm IJobPartTransferMgr, destination string, pacer pace
destination,
fmt.Sprintf("BlobType %q is used for destination blob by default.", blob.BlobTypeBlockBlob))
}
return newURLToBlockBlobCopier(jptm, destination, pacer, srcInfoProvider)
return newURLToBlockBlobCopier(jptm, pacer, srcInfoProvider)
}
}
2 changes: 1 addition & 1 deletion ste/sender-appendBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func newAppendBlobSenderBase(jptm IJobPartTransferMgr, destination string, pacer
chunkSize)

srcSize := transferInfo.SourceSize
numChunks := getNumChunks(srcSize, chunkSize)
numChunks := getNumChunks(srcSize, chunkSize, chunkSize)

bsc, err := jptm.DstServiceClient().BlobServiceClient()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ste/sender-azureFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func newAzureFileSenderBase(jptm IJobPartTransferMgr, destination string, pacer
}

// compute num chunks (irrelevant but harmless for folders)
numChunks := getNumChunks(info.SourceSize, chunkSize)
numChunks := getNumChunks(info.SourceSize, chunkSize, chunkSize)

// due to the REST parity feature added in 2019-02-02, the File APIs are no longer backward compatible
// so we must use the latest SDK version to stay safe
Expand Down
Loading
Loading