diff --git a/sdk/storage/azblob/assets.json b/sdk/storage/azblob/assets.json index b9fb1441baa5..6cc6487baabf 100644 --- a/sdk/storage/azblob/assets.json +++ b/sdk/storage/azblob/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "go", "TagPrefix": "go/storage/azblob", - "Tag": "go/storage/azblob_1a8d8f30f5" + "Tag": "go/storage/azblob_23a06ae998" } diff --git a/sdk/storage/azblob/blob/client_test.go b/sdk/storage/azblob/blob/client_test.go index 64bf13390f52..cc1dffcedef1 100644 --- a/sdk/storage/azblob/blob/client_test.go +++ b/sdk/storage/azblob/blob/client_test.go @@ -191,67 +191,6 @@ func waitForCopy(_require *require.Assertions, copyBlobClient *blockblob.Client, } } -func (s *BlobUnrecordedTestsSuite) TestCopyBlockBlobFromUrlSourceContentMD5() { - _require := require.New(s.T()) - testName := s.T().Name() - svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) - if err != nil { - s.Fail("Unable to fetch service client because " + err.Error()) - } - - containerName := testcommon.GenerateContainerName(testName) - containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) - defer testcommon.DeleteContainer(context.Background(), _require, containerClient) - - const contentSize = 8 * 1024 // 8 KB - content := make([]byte, contentSize) - contentMD5 := md5.Sum(content) - body := bytes.NewReader(content) - - srcBlob := containerClient.NewBlockBlobClient("srcblob") - destBlob := containerClient.NewBlockBlobClient("destblob") - - // Prepare source bbClient for copy. - _, err = srcBlob.Upload(context.Background(), streaming.NopCloser(body), nil) - _require.Nil(err) - - expiryTime, err := time.Parse(time.UnixDate, "Fri Jun 11 20:00:00 UTC 2049") - _require.Nil(err) - - credential, err := testcommon.GetGenericSharedKeyCredential(testcommon.TestAccountDefault) - if err != nil { - s.T().Fatal("Couldn't fetch credential because " + err.Error()) - } - - // Get source blob url with SAS for StageFromURL. - sasQueryParams, err := sas.AccountSignatureValues{ - Protocol: sas.ProtocolHTTPS, - ExpiryTime: expiryTime, - Permissions: to.Ptr(sas.AccountPermissions{Read: true, List: true}).String(), - ResourceTypes: to.Ptr(sas.AccountResourceTypes{Container: true, Object: true}).String(), - }.SignWithSharedKey(credential) - _require.Nil(err) - - srcBlobParts, _ := blob.ParseURL(srcBlob.URL()) - srcBlobParts.SAS = sasQueryParams - srcBlobURLWithSAS := srcBlobParts.String() - - // Invoke CopyFromURL. - sourceContentMD5 := contentMD5[:] - resp, err := destBlob.CopyFromURL(context.Background(), srcBlobURLWithSAS, &blob.CopyFromURLOptions{ - SourceContentMD5: sourceContentMD5, - }) - _require.Nil(err) - _require.EqualValues(resp.ContentMD5, sourceContentMD5) - - // Provide bad MD5 and make sure the copy fails - _, badMD5 := testcommon.GetDataAndReader(testName, 16) - resp, err = destBlob.CopyFromURL(context.Background(), srcBlobURLWithSAS, &blob.CopyFromURLOptions{ - SourceContentMD5: badMD5, - }) - _require.NotNil(err) -} - func (s *BlobRecordedTestsSuite) TestBlobStartCopyDestEmpty() { _require := require.New(s.T()) testName := s.T().Name() diff --git a/sdk/storage/azblob/bloberror/error_codes.go b/sdk/storage/azblob/bloberror/error_codes.go index ad653c1c4655..8a1573c0ce25 100644 --- a/sdk/storage/azblob/bloberror/error_codes.go +++ b/sdk/storage/azblob/bloberror/error_codes.go @@ -153,4 +153,5 @@ const ( var ( // MissingSharedKeyCredential - Error is returned when SAS URL is being created without SharedKeyCredential. MissingSharedKeyCredential = errors.New("SAS can only be signed with a SharedKeyCredential") + UnsupportedChecksum = errors.New("for multi-part uploads, user generated checksums cannot be validated") ) diff --git a/sdk/storage/azblob/blockblob/client.go b/sdk/storage/azblob/blockblob/client.go index a480a518db5a..eedf4c2362e6 100644 --- a/sdk/storage/azblob/blockblob/client.go +++ b/sdk/storage/azblob/blockblob/client.go @@ -12,9 +12,11 @@ import ( "encoding/base64" "errors" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "io" "math" "os" + "reflect" "sync" "time" @@ -172,6 +174,13 @@ func (bb *Client) Upload(ctx context.Context, body io.ReadSeekCloser, options *U opts, httpHeaders, leaseInfo, cpkV, cpkN, accessConditions := options.format() + if options != nil && options.TransactionalValidation != nil { + body, err = options.TransactionalValidation.Apply(body, opts) + if err != nil { + return UploadResponse{}, err + } + } + resp, err := bb.generated().Upload(ctx, count, body, opts, httpHeaders, leaseInfo, cpkV, cpkN, accessConditions) return resp, err } @@ -260,6 +269,11 @@ func (bb *Client) CommitBlockList(ctx context.Context, base64BlockIDs []string, ImmutabilityPolicyExpiry: options.ImmutabilityPolicyExpiryTime, } + // If user attempts to pass in their own checksum, errors out. + if options.TransactionalContentMD5 != nil || options.TransactionalContentCRC64 != nil { + return CommitBlockListResponse{}, bloberror.UnsupportedChecksum + } + headers = options.HTTPHeaders leaseAccess, modifiedAccess = exported.FormatBlobAccessConditions(options.AccessConditions) cpkInfo = options.CPKInfo @@ -506,6 +520,12 @@ func (bb *Client) UploadBuffer(ctx context.Context, buffer []byte, o *UploadBuff if o != nil { uploadOptions = *o } + + // If user attempts to pass in their own checksum, errors out. + if uploadOptions.TransactionalValidation != nil && reflect.TypeOf(uploadOptions.TransactionalValidation).Kind() != reflect.Func { + return UploadBufferResponse{}, bloberror.UnsupportedChecksum + } + return bb.uploadFromReader(ctx, bytes.NewReader(buffer), int64(len(buffer)), &uploadOptions) } @@ -519,6 +539,12 @@ func (bb *Client) UploadFile(ctx context.Context, file *os.File, o *UploadFileOp if o != nil { uploadOptions = *o } + + // If user attempts to pass in their own checksum, errors out. + if uploadOptions.TransactionalValidation != nil && reflect.TypeOf(uploadOptions.TransactionalValidation).Kind() != reflect.Func { + return UploadFileResponse{}, bloberror.UnsupportedChecksum + } + return bb.uploadFromReader(ctx, file, stat.Size(), &uploadOptions) } @@ -529,6 +555,11 @@ func (bb *Client) UploadStream(ctx context.Context, body io.Reader, o *UploadStr o = &UploadStreamOptions{} } + // If user attempts to pass in their own checksum, errors out. + if o.TransactionalValidation != nil && reflect.TypeOf(o.TransactionalValidation).Kind() != reflect.Func { + return UploadStreamResponse{}, bloberror.UnsupportedChecksum + } + result, err := copyFromReader(ctx, body, bb, *o, newMMBPool) if err != nil { return CommitBlockListResponse{}, err diff --git a/sdk/storage/azblob/blockblob/client_test.go b/sdk/storage/azblob/blockblob/client_test.go index 981f0d9ec855..f0e8af99c17f 100644 --- a/sdk/storage/azblob/blockblob/client_test.go +++ b/sdk/storage/azblob/blockblob/client_test.go @@ -257,6 +257,187 @@ type BlockBlobUnrecordedTestsSuite struct { // _require.Nil(err) // _require.EqualValues(destData, content) // } + +func (s *BlockBlobUnrecordedTestsSuite) TestStageBlockFromURLWithMD5() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + contentSize := 4 * 1024 // 4 KB + r, sourceData := testcommon.GetDataAndReader(testName, contentSize) + contentMD5 := md5.Sum(sourceData) + rsc := streaming.NopCloser(r) + + srcBlob := containerClient.NewBlockBlobClient("src" + testcommon.GenerateBlobName(testName)) + destBlob := containerClient.NewBlockBlobClient("dst" + testcommon.GenerateBlobName(testName)) + + // Prepare source bbClient for copy. + _, err = srcBlob.Upload(context.Background(), rsc, nil) + _require.Nil(err) + + // Get source blob url with SAS for StageFromURL. + srcBlobParts, _ := blob.ParseURL(srcBlob.URL()) + credential, err := testcommon.GetGenericSharedKeyCredential(testcommon.TestAccountDefault) + _require.Nil(err) + perms := sas.BlobPermissions{Read: true} + + srcBlobParts.SAS, err = sas.BlobSignatureValues{ + Protocol: sas.ProtocolHTTPS, // Users MUST use HTTPS (not HTTP) + ExpiryTime: time.Now().UTC().Add(48 * time.Hour), // 48-hours before expiration + ContainerName: srcBlobParts.ContainerName, + BlobName: srcBlobParts.BlobName, + Permissions: perms.String(), + }.SignWithSharedKey(credential) + _require.NoError(err) + + srcBlobURLWithSAS := srcBlobParts.String() + + // Stage blocks from URL. + blockIDs := testcommon.GenerateBlockIDsList(2) + + opts := blockblob.StageBlockFromURLOptions{ + SourceContentValidation: blob.SourceContentValidationTypeMD5(contentMD5[:]), + } + + stageResp1, err := destBlob.StageBlockFromURL(context.Background(), blockIDs[0], srcBlobURLWithSAS, &opts) + _require.Nil(err) + _require.Equal(stageResp1.ContentMD5, contentMD5[:]) + + stageResp2, err := destBlob.StageBlockFromURL(context.Background(), blockIDs[1], srcBlobURLWithSAS, &opts) + _require.Nil(err) + _require.Equal(stageResp2.ContentMD5, contentMD5[:]) + + // Check block list. + blockList, err := destBlob.GetBlockList(context.Background(), blockblob.BlockListTypeAll, nil) + _require.Nil(err) + _require.NotNil(blockList.BlockList) + _require.Nil(blockList.BlockList.CommittedBlocks) + _require.NotNil(blockList.BlockList.UncommittedBlocks) + _require.Len(blockList.BlockList.UncommittedBlocks, 2) + + // Commit block list. + _, err = destBlob.CommitBlockList(context.Background(), blockIDs, nil) + _require.Nil(err) + + // Check data integrity through downloading. + destBuffer := make([]byte, 4*1024) + downloadBufferOptions := blob.DownloadBufferOptions{Range: blob.HTTPRange{Offset: 0, Count: 4096}} + _, err = destBlob.DownloadBuffer(context.Background(), destBuffer, &downloadBufferOptions) + _require.Nil(err) + _require.Equal(destBuffer, sourceData) + + // Test stage block from URL with bad MD5 value + _, badMD5 := testcommon.GetDataAndReader(testName, 16) + var badMD5Validator blob.SourceContentValidationTypeMD5 = badMD5[:] + opts = blockblob.StageBlockFromURLOptions{ + SourceContentValidation: badMD5Validator, + } + _, err = destBlob.StageBlockFromURL(context.Background(), blockIDs[0], srcBlobURLWithSAS, &opts) + _require.NotNil(err) + testcommon.ValidateBlobErrorCode(_require, err, bloberror.MD5Mismatch) + + _, err = destBlob.StageBlockFromURL(context.Background(), blockIDs[1], srcBlobURLWithSAS, &opts) + _require.NotNil(err) + testcommon.ValidateBlobErrorCode(_require, err, bloberror.MD5Mismatch) +} + +func (s *BlockBlobUnrecordedTestsSuite) TestStageBlockFromURLWithCRC64() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + contentSize := 4 * 1024 // 4 KB + r, sourceData := testcommon.GetDataAndReader(testName, contentSize) + rsc := streaming.NopCloser(r) + crc64Value := crc64.Checksum(sourceData, shared.CRC64Table) + crc := make([]byte, 8) + binary.LittleEndian.PutUint64(crc, crc64Value) + + srcBlob := containerClient.NewBlockBlobClient("src" + testcommon.GenerateBlobName(testName)) + destBlob := containerClient.NewBlockBlobClient("dst" + testcommon.GenerateBlobName(testName)) + + // Prepare source bbClient for copy. + _, err = srcBlob.Upload(context.Background(), rsc, nil) + _require.Nil(err) + + // Get source blob url with SAS for StageFromURL. + srcBlobParts, _ := blob.ParseURL(srcBlob.URL()) + credential, err := testcommon.GetGenericSharedKeyCredential(testcommon.TestAccountDefault) + _require.Nil(err) + perms := sas.BlobPermissions{Read: true} + + srcBlobParts.SAS, err = sas.BlobSignatureValues{ + Protocol: sas.ProtocolHTTPS, // Users MUST use HTTPS (not HTTP) + ExpiryTime: time.Now().UTC().Add(48 * time.Hour), // 48-hours before expiration + ContainerName: srcBlobParts.ContainerName, + BlobName: srcBlobParts.BlobName, + Permissions: perms.String(), + }.SignWithSharedKey(credential) + _require.NoError(err) + + srcBlobURLWithSAS := srcBlobParts.String() + + // Stage blocks from URL. + blockIDs := testcommon.GenerateBlockIDsList(2) + + opts := blockblob.StageBlockFromURLOptions{ + SourceContentValidation: blob.SourceContentValidationTypeCRC64(crc), + } + + stageResp1, err := destBlob.StageBlockFromURL(context.Background(), blockIDs[0], srcBlobURLWithSAS, &opts) + _require.Nil(err) + _require.Equal(stageResp1.ContentCRC64, crc) + + stageResp2, err := destBlob.StageBlockFromURL(context.Background(), blockIDs[1], srcBlobURLWithSAS, &opts) + _require.Nil(err) + _require.Equal(stageResp2.ContentCRC64, crc) + + // Check block list. + blockList, err := destBlob.GetBlockList(context.Background(), blockblob.BlockListTypeAll, nil) + _require.Nil(err) + _require.NotNil(blockList.BlockList) + _require.Nil(blockList.BlockList.CommittedBlocks) + _require.NotNil(blockList.BlockList.UncommittedBlocks) + _require.Len(blockList.BlockList.UncommittedBlocks, 2) + + // Commit block list. + _, err = destBlob.CommitBlockList(context.Background(), blockIDs, nil) + _require.Nil(err) + + // Check data integrity through downloading. + destBuffer := make([]byte, 4*1024) + downloadBufferOptions := blob.DownloadBufferOptions{Range: blob.HTTPRange{Offset: 0, Count: 4096}} + _, err = destBlob.DownloadBuffer(context.Background(), destBuffer, &downloadBufferOptions) + _require.Nil(err) + _require.Equal(destBuffer, sourceData) + + // Test stage block from URL with bad CRC64 value + _, sourceData = testcommon.GetDataAndReader(testName, 16) + crc64Value = crc64.Checksum(sourceData, shared.CRC64Table) + badCRC := make([]byte, 8) + binary.LittleEndian.PutUint64(badCRC, crc64Value) + opts = blockblob.StageBlockFromURLOptions{ + SourceContentValidation: blob.SourceContentValidationTypeCRC64(badCRC), + } + _, err = destBlob.StageBlockFromURL(context.Background(), blockIDs[0], srcBlobURLWithSAS, &opts) + _require.NotNil(err) + testcommon.ValidateBlobErrorCode(_require, err, bloberror.CRC64Mismatch) + + _, err = destBlob.StageBlockFromURL(context.Background(), blockIDs[1], srcBlobURLWithSAS, &opts) + _require.NotNil(err) + testcommon.ValidateBlobErrorCode(_require, err, bloberror.CRC64Mismatch) +} + // // func (s *BlockBlobUnrecordedTestsSuite) TestCopyBlockBlobFromURL() { // _require := require.New(s.T()) @@ -421,7 +602,7 @@ type BlockBlobUnrecordedTestsSuite struct { // } // nolint -func (s *BlockBlobUnrecordedTestsSuite) TestStageBlockWithGeneratedCRC64() { +func (s *BlockBlobRecordedTestsSuite) TestStageBlockWithGeneratedCRC64() { _require := require.New(s.T()) testName := s.T().Name() svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) @@ -434,7 +615,7 @@ func (s *BlockBlobUnrecordedTestsSuite) TestStageBlockWithGeneratedCRC64() { blobName := testcommon.GenerateBlobName(testName) bbClient := containerClient.NewBlockBlobClient(blobName) - // test put block with valid CRC64 value + // test stage block with valid CRC64 value contentSize := 8 * 1024 // 8 KB content := make([]byte, contentSize) body := bytes.NewReader(content) @@ -446,24 +627,49 @@ func (s *BlockBlobUnrecordedTestsSuite) TestStageBlockWithGeneratedCRC64() { TransactionalValidation: blob.TransferValidationTypeComputeCRC64(), }) _require.Nil(err) - // _require.Equal(putResp.RawResponse.StatusCode, 201) _require.NotNil(putResp.ContentCRC64) _require.EqualValues(binary.LittleEndian.Uint64(putResp.ContentCRC64), contentCrc64) _require.NotNil(putResp.RequestID) _require.NotNil(putResp.Version) _require.NotNil(putResp.Date) _require.Equal((*putResp.Date).IsZero(), false) +} + +func (s *BlockBlobRecordedTestsSuite) TestStageBlockWithCRC64() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + blobName := testcommon.GenerateBlobName(testName) + bbClient := containerClient.NewBlockBlobClient(blobName) + + // test stage block with valid CRC64 value + contentSize := 8 * 1024 // 8 KB + content := make([]byte, contentSize) + body := bytes.NewReader(content) + contentCrc64 := crc64.Checksum(content, shared.CRC64Table) + rsc := streaming.NopCloser(body) + + blockID1 := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%6d", 0))) + putResp, err := bbClient.StageBlock(context.Background(), blockID1, rsc, &blockblob.StageBlockOptions{ + TransactionalValidation: blob.TransferValidationTypeCRC64(contentCrc64), + }) + _require.Nil(err) + _require.EqualValues(binary.LittleEndian.Uint64(putResp.ContentCRC64), contentCrc64) // test put block with bad CRC64 value badContentCrc64 := rand.Uint64() - _, _ = rsc.Seek(0, io.SeekStart) blockID2 := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%6d", 1))) _, err = bbClient.StageBlock(context.Background(), blockID2, rsc, &blockblob.StageBlockOptions{ TransactionalValidation: blob.TransferValidationTypeCRC64(badContentCrc64), }) _require.NotNil(err) - _require.Contains(err.Error(), bloberror.CRC64Mismatch) } // nolint @@ -480,7 +686,7 @@ func (s *BlockBlobRecordedTestsSuite) TestStageBlockWithMD5() { blobName := testcommon.GenerateBlobName(testName) bbClient := containerClient.NewBlockBlobClient(blobName) - // test put block with valid MD5 value + // test stage block with valid MD5 value contentSize := 8 * 1024 // 8 KB content := make([]byte, contentSize) body := bytes.NewReader(content) @@ -493,14 +699,13 @@ func (s *BlockBlobRecordedTestsSuite) TestStageBlockWithMD5() { TransactionalValidation: blob.TransferValidationTypeMD5(contentMD5), }) _require.Nil(err) - // _require.Equal(putResp.RawResponse.StatusCode, 201) _require.EqualValues(putResp.ContentMD5, contentMD5) _require.NotNil(putResp.RequestID) _require.NotNil(putResp.Version) _require.NotNil(putResp.Date) _require.Equal((*putResp.Date).IsZero(), false) - // test put block with bad MD5 value + // test stage block with bad MD5 value _, badContent := testcommon.GetDataAndReader(testName, contentSize) badMD5Value := md5.Sum(badContent) badContentMD5 := badMD5Value[:] @@ -514,6 +719,39 @@ func (s *BlockBlobRecordedTestsSuite) TestStageBlockWithMD5() { _require.Contains(err.Error(), bloberror.MD5Mismatch) } +func (s *BlockBlobRecordedTestsSuite) TestPutBlobWithCRC64() { + s.T().Skip("Content CRC64 cannot be validated in Upload()") + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := "test" + testcommon.GenerateContainerName(testName) + "1" + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + contentSize := 4 * 1024 // 4 KB + r, sourceData := testcommon.GetDataAndReader(testName, contentSize) + rsc := streaming.NopCloser(r) + crc64Value := crc64.Checksum(sourceData, shared.CRC64Table) + crc := make([]byte, 8) + binary.LittleEndian.PutUint64(crc, crc64Value) + + blobName := testcommon.GenerateBlobName(testName) + bbClient := testcommon.GetBlockBlobClient(blobName, containerClient) + + blockID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%6d", 0))) + _, err = bbClient.StageBlock(context.Background(), blockID, streaming.NopCloser(strings.NewReader(testcommon.BlockBlobDefaultData)), nil) + _require.Nil(err) + + _, err = bbClient.Upload(context.Background(), rsc, &blockblob.UploadOptions{ + TransactionalValidation: blob.TransferValidationTypeCRC64(crc64Value), + }) + _require.Error(err, bloberror.UnsupportedChecksum) + // TODO: UploadResponse does not return ContentCRC64 + // _require.Equal(resp.ContentCRC64, crc) +} + func (s *BlockBlobRecordedTestsSuite) TestBlobPutBlobHTTPHeaders() { _require := require.New(s.T()) testName := s.T().Name() @@ -2012,6 +2250,64 @@ func (s *BlockBlobRecordedTestsSuite) TestBlobSetTierOnCommit() { } } +func (s *BlockBlobRecordedTestsSuite) TestCommitBlockListWithMD5() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := "test" + testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + contentSize := 4 * 1024 // 4 KB + _, sourceData := testcommon.GetDataAndReader(testName, contentSize) + contentMD5 := md5.Sum(sourceData) + + blobName := testcommon.GenerateBlobName(testName) + bbClient := testcommon.GetBlockBlobClient(blobName, containerClient) + + blockID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%6d", 0))) + _, err = bbClient.StageBlock(context.Background(), blockID, streaming.NopCloser(strings.NewReader(testcommon.BlockBlobDefaultData)), nil) + _require.Nil(err) + + // CommitBlockList is a multipart upload, user generated checksum cannot be passed + _, err = bbClient.CommitBlockList(context.Background(), []string{blockID}, &blockblob.CommitBlockListOptions{ + TransactionalContentMD5: contentMD5[:], + }) + _require.Error(err, bloberror.UnsupportedChecksum) +} + +func (s *BlockBlobRecordedTestsSuite) TestCommitBlockListWithCRC64() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := "test" + testcommon.GenerateContainerName(testName) + "1" + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + contentSize := 4 * 1024 // 4 KB + _, sourceData := testcommon.GetDataAndReader(testName, contentSize) + crc64Value := crc64.Checksum(sourceData, shared.CRC64Table) + crc := make([]byte, 8) + binary.LittleEndian.PutUint64(crc, crc64Value) + + blobName := testcommon.GenerateBlobName(testName) + bbClient := testcommon.GetBlockBlobClient(blobName, containerClient) + + blockID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%6d", 0))) + _, err = bbClient.StageBlock(context.Background(), blockID, streaming.NopCloser(strings.NewReader(testcommon.BlockBlobDefaultData)), nil) + _require.Nil(err) + + // CommitBlockList is a multipart upload, user generated checksum cannot be passed + _, err = bbClient.CommitBlockList(context.Background(), []string{blockID}, &blockblob.CommitBlockListOptions{ + TransactionalContentCRC64: crc, + }) + _require.Error(err, bloberror.UnsupportedChecksum) +} + func (s *BlockBlobUnrecordedTestsSuite) TestSetTierOnCopyBlockBlobFromURL() { _require := require.New(s.T()) testName := s.T().Name() @@ -2071,6 +2367,59 @@ func (s *BlockBlobUnrecordedTestsSuite) TestSetTierOnCopyBlockBlobFromURL() { } } +func (s *BlockBlobUnrecordedTestsSuite) TestCopyBlockBlobFromUrlSourceContentMD5() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + contentSize := 8 * 1024 // 8 KB + body, sourceData := testcommon.GetDataAndReader(testName, contentSize) + contentMD5 := md5.Sum(sourceData) + + srcBlob := containerClient.NewBlockBlobClient("src" + testName) + destBlob := containerClient.NewBlockBlobClient("dest" + testName) + + // Prepare source bbClient for copy. + _, err = srcBlob.Upload(context.Background(), streaming.NopCloser(body), nil) + _require.Nil(err) + + credential, err := testcommon.GetGenericSharedKeyCredential(testcommon.TestAccountDefault) + _require.NoError(err) + + // Get source blob url with SAS for CopyFromURL. + sasQueryParams, err := sas.AccountSignatureValues{ + Protocol: sas.ProtocolHTTPS, + ExpiryTime: time.Now().UTC().Add(48 * time.Hour), // 48-hours before expiration, + Permissions: to.Ptr(sas.AccountPermissions{Read: true, List: true}).String(), + ResourceTypes: to.Ptr(sas.AccountResourceTypes{Container: true, Object: true}).String(), + }.SignWithSharedKey(credential) + _require.Nil(err) + + srcBlobParts, _ := blob.ParseURL(srcBlob.URL()) + srcBlobParts.SAS = sasQueryParams + srcBlobURLWithSAS := srcBlobParts.String() + + // Invoke CopyFromURL. + sourceContentMD5 := contentMD5[:] + resp, err := destBlob.CopyFromURL(context.Background(), srcBlobURLWithSAS, &blob.CopyFromURLOptions{ + SourceContentMD5: sourceContentMD5, + }) + _require.Nil(err) + _require.EqualValues(resp.ContentMD5, sourceContentMD5) + + // Provide bad MD5 and make sure the copy fails + _, badMD5 := testcommon.GetDataAndReader(testName, 16) + resp, err = destBlob.CopyFromURL(context.Background(), srcBlobURLWithSAS, &blob.CopyFromURLOptions{ + SourceContentMD5: badMD5, + }) + _require.NotNil(err) +} + // func (s *BlockBlobUnrecordedTestsSuite) TestSetTierOnStageBlockFromURL() { // _require := require.New(s.T()) // testName := s.T().Name() @@ -3997,6 +4346,46 @@ func (s *BlockBlobRecordedTestsSuite) TestPutBlockAndPutBlockListWithCPKByScope( // _require.EqualValues(*downloadResp.EncryptionScope, *testcommon.TestCPKByScope.EncryptionScope) // } +func (s *BlockBlobRecordedTestsSuite) TestUploadBlobWithMD5() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerClient := testcommon.CreateNewContainer(context.Background(), _require, testcommon.GenerateContainerName(testName), svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + contentSize := 8 * 1024 + r, srcData := testcommon.GenerateData(contentSize) + md5Val := md5.Sum(srcData) + bbClient := containerClient.NewBlockBlobClient(testcommon.GenerateBlobName(testName)) + + uploadBlockBlobOptions := blockblob.UploadOptions{ + TransactionalValidation: blob.TransferValidationTypeMD5(md5Val[:]), + } + uploadResp, err := bbClient.Upload(context.Background(), r, &uploadBlockBlobOptions) + _require.Nil(err) + _require.Equal(uploadResp.ContentMD5, md5Val[:]) + + // Download blob to do data integrity check. + downloadResp, err := bbClient.DownloadStream(context.Background(), nil) + _require.Nil(err) + _require.EqualValues(downloadResp.ContentMD5, md5Val[:]) + destData, err := io.ReadAll(downloadResp.Body) + _require.Nil(err) + _require.EqualValues(destData, srcData) + + // Test Upload with bad MD5 + _, badMD5 := testcommon.GetDataAndReader(testName, 16) + var badMD5Validator blob.TransferValidationTypeMD5 = badMD5[:] + + uploadBlockBlobOptions = blockblob.UploadOptions{ + TransactionalValidation: badMD5Validator, + } + uploadResp, err = bbClient.Upload(context.Background(), r, &uploadBlockBlobOptions) + _require.NotNil(err) +} + func (s *BlockBlobRecordedTestsSuite) TestUploadBlobWithMD5WithCPK() { _require := require.New(s.T()) testName := s.T().Name() @@ -4680,6 +5069,73 @@ func (s *BlockBlobUnrecordedTestsSuite) TestUploadFromReader() { } } +/* siminsavani: This test has a large allocation and blocks other tests from running that's why this test is commented out +func (s *BlockBlobUnrecordedTestsSuite) TestLargeBlockBufferedUploadInParallelWithGeneratedCRC64() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + bbClient := testcommon.GetBlockBlobClient(testcommon.GenerateBlobName(testName), containerClient) + + var largeBlockSize, numberOfBlocks int64 = 2500 * 1024 * 1024, 2 + _, sourceData := testcommon.GetDataAndReader(testName, int(numberOfBlocks*largeBlockSize)) + // rsc := streaming.NopCloser(r) + crc64Value := crc64.Checksum(sourceData, shared.CRC64Table) + crc := make([]byte, 8) + binary.LittleEndian.PutUint64(crc, crc64Value) + + _, err = bbClient.UploadBuffer(context.Background(), sourceData, &blockblob.UploadBufferOptions{ + TransactionalValidation: blob.TransferValidationTypeComputeCRC64(), + BlockSize: largeBlockSize, + Concurrency: 2, + }) + _require.Nil(err) + + resp, err := bbClient.GetBlockList(context.Background(), blockblob.BlockListTypeAll, nil) + _require.Nil(err) + _require.Len(resp.BlockList.CommittedBlocks, 2) + _require.Equal(*resp.BlobContentLength, numberOfBlocks*largeBlockSize) + committed := resp.BlockList.CommittedBlocks + _require.Equal(*(committed[0].Size), largeBlockSize) + _require.Equal(*(committed[1].Size), largeBlockSize) +}*/ + +func (s *BlockBlobRecordedTestsSuite) TestUploadBufferWithCRC64OrMD5() { + _require := require.New(s.T()) + testName := s.T().Name() + svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient) + defer testcommon.DeleteContainer(context.Background(), _require, containerClient) + + bbClient := testcommon.GetBlockBlobClient(testcommon.GenerateBlobName(testName), containerClient) + + _, content := testcommon.GetDataAndReader(testName, 1024) + md5Value := md5.Sum(content) + contentMD5 := md5Value[:] + + crc64Value := crc64.Checksum(content, shared.CRC64Table) + + _, err = bbClient.UploadBuffer(context.Background(), content, &blockblob.UploadBufferOptions{ + TransactionalValidation: blob.TransferValidationTypeCRC64(crc64Value), + }) + _require.NotNil(err) + _require.Error(err, bloberror.UnsupportedChecksum) + + _, err = bbClient.UploadBuffer(context.Background(), content, &blockblob.UploadBufferOptions{ + TransactionalValidation: blob.TransferValidationTypeMD5(contentMD5), + }) + _require.NotNil(err) + _require.Error(err, bloberror.UnsupportedChecksum) +} + func (s *BlockBlobRecordedTestsSuite) TestBlockGetAccountInfo() { _require := require.New(s.T()) testName := s.T().Name() diff --git a/sdk/storage/azblob/blockblob/models.go b/sdk/storage/azblob/blockblob/models.go index ba1b9ee9f67b..662e78e5eac7 100644 --- a/sdk/storage/azblob/blockblob/models.go +++ b/sdk/storage/azblob/blockblob/models.go @@ -36,8 +36,9 @@ type UploadOptions struct { // Optional. Indicates the tier to be set on the blob. Tier *blob.AccessTier - // Specify the transactional md5 for the body, to be validated by the service. - TransactionalContentMD5 []byte + // TransactionalValidation specifies the transfer validation type to use. + // The default is nil (no transfer validation). + TransactionalValidation blob.TransferValidationType HTTPHeaders *blob.HTTPHeaders CPKInfo *blob.CPKInfo @@ -46,6 +47,9 @@ type UploadOptions struct { LegalHold *bool ImmutabilityPolicyMode *blob.ImmutabilityPolicySetting ImmutabilityPolicyExpiryTime *time.Time + + // Deprecated: TransactionalContentMD5 can be set by using TransactionalValidation instead + TransactionalContentMD5 []byte } func (o *UploadOptions) format() (*generated.BlockBlobClientUploadOptions, *generated.BlobHTTPHeaders, *generated.LeaseAccessConditions, @@ -190,8 +194,6 @@ type CommitBlockListOptions struct { RequestID *string Tier *blob.AccessTier Timeout *int32 - TransactionalContentCRC64 []byte - TransactionalContentMD5 []byte HTTPHeaders *blob.HTTPHeaders CPKInfo *blob.CPKInfo CPKScopeInfo *blob.CPKScopeInfo @@ -199,6 +201,12 @@ type CommitBlockListOptions struct { LegalHold *bool ImmutabilityPolicyMode *blob.ImmutabilityPolicySetting ImmutabilityPolicyExpiryTime *time.Time + + // Deprecated: TransactionalContentCRC64 cannot be generated + TransactionalContentCRC64 []byte + + // Deprecated: TransactionalContentMD5 cannot be generated + TransactionalContentMD5 []byte } // --------------------------------------------------------------------------------------------------------------------- @@ -253,9 +261,10 @@ type uploadFromReaderOptions struct { TransactionalValidation blob.TransferValidationType - // Optional header, Specifies the transactional crc64 for the body, to be validated by the service. + // Deprecated: TransactionalContentCRC64 cannot be generated at block level TransactionalContentCRC64 uint64 - // Specify the transactional md5 for the body, to be validated by the service. + + // Deprecated: TransactionalContentMD5 cannot be generated at block level TransactionalContentMD5 []byte } diff --git a/sdk/storage/azblob/client_test.go b/sdk/storage/azblob/client_test.go index f04d5190a59a..579b4751eb92 100644 --- a/sdk/storage/azblob/client_test.go +++ b/sdk/storage/azblob/client_test.go @@ -8,8 +8,12 @@ package azblob_test import ( "context" + "crypto/md5" + "encoding/binary" "errors" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "hash/crc64" "io" "os" "sync/atomic" @@ -77,6 +81,60 @@ func generateFile(fileName string, fileSize int) []byte { return bigBuff } +func performUploadStreamToBlockBlobTestWithChecksums(t *testing.T, _require *require.Assertions, testName string, blobSize, bufferSize, maxBuffers int) { + client, err := testcommon.GetClient(t, testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + _, err = client.CreateContainer(context.Background(), containerName, nil) + _require.NoError(err) + defer func() { + _, err := client.DeleteContainer(context.Background(), containerName, nil) + _require.NoError(err) + }() + + // Set up test blob + blobName := testcommon.GenerateBlobName(testName) + + // Create some data to test the upload stream + blobContentReader, blobData := testcommon.GenerateData(blobSize) + crc64Value := crc64.Checksum(blobData, shared.CRC64Table) + crc := make([]byte, 8) + binary.LittleEndian.PutUint64(crc, crc64Value) + + // Perform UploadStream + _, err = client.UploadStream(ctx, containerName, blobName, blobContentReader, + &blockblob.UploadStreamOptions{BlockSize: int64(bufferSize), Concurrency: maxBuffers, TransactionalValidation: blob.TransferValidationTypeComputeCRC64()}) + _require.NoError(err) + + // TODO: UploadResponse does not return ContentCRC64 + // // Assert that upload was successful + // _require.Equal(uploadResp.ContentCRC64, crc) + + // UploadStream does not accept user generated checksum, should return UnsupportedChecksum + _, err = client.UploadStream(ctx, containerName, blobName, blobContentReader, + &blockblob.UploadStreamOptions{BlockSize: int64(bufferSize), Concurrency: maxBuffers, TransactionalValidation: blob.TransferValidationTypeCRC64(crc64Value)}) + _require.NotNil(err) + _require.Error(err, bloberror.UnsupportedChecksum) + + md5Value := md5.Sum(blobData) + contentMD5 := md5Value[:] + + _, err = client.UploadStream(ctx, containerName, blobName, blobContentReader, + &blockblob.UploadStreamOptions{BlockSize: int64(bufferSize), Concurrency: maxBuffers, TransactionalValidation: blob.TransferValidationTypeMD5(contentMD5)}) + _require.NotNil(err) + _require.Error(err, bloberror.UnsupportedChecksum) +} + +func (s *AZBlobUnrecordedTestsSuite) TestUploadStreamToBlockBlobInChunksChecksums() { + blobSize := 8 * 1024 + bufferSize := 1024 + maxBuffers := 3 + _require := require.New(s.T()) + testName := s.T().Name() + performUploadStreamToBlockBlobTestWithChecksums(s.T(), _require, testName, blobSize, bufferSize, maxBuffers) +} + func performUploadStreamToBlockBlobTest(t *testing.T, _require *require.Assertions, testName string, blobSize, bufferSize, maxBuffers int) { client, err := testcommon.GetClient(t, testcommon.TestAccountDefault, nil) _require.NoError(err) @@ -159,6 +217,102 @@ func (s *AZBlobUnrecordedTestsSuite) TestUploadStreamToBlockBlobEmpty() { performUploadStreamToBlockBlobTest(s.T(), _require, testName, blobSize, bufferSize, maxBuffers) } +func performUploadAndDownloadFileTestWithChecksums(t *testing.T, _require *require.Assertions, testName string, fileSize, blockSize, concurrency, downloadOffset, downloadCount int) { + // Set up file to upload + fileName := "BigFile.bin" + fileData := generateFile(fileName, fileSize) + + // Open the file to upload + file, err := os.Open(fileName) + _require.NoError(err) + defer func(file *os.File) { + _ = file.Close() + }(file) + defer func(name string) { + _ = os.Remove(name) + }(fileName) + + //body := bytes.NewReader(fileData) + crc64Value := crc64.Checksum(fileData, shared.CRC64Table) + crc := make([]byte, 8) + binary.LittleEndian.PutUint64(crc, crc64Value) + + client, err := testcommon.GetClient(t, testcommon.TestAccountDefault, nil) + _require.NoError(err) + + containerName := testcommon.GenerateContainerName(testName) + _, err = client.CreateContainer(context.Background(), containerName, nil) + _require.NoError(err) + defer func() { + _, err := client.DeleteContainer(context.Background(), containerName, nil) + _require.NoError(err) + }() + + // Set up test blob + blobName := testcommon.GenerateBlobName(testName) + + // Upload the file to a block blob + var errTransferred error + _, err = client.UploadFile(context.Background(), containerName, blobName, file, + &blockblob.UploadFileOptions{ + BlockSize: int64(blockSize), + Concurrency: uint16(concurrency), + TransactionalValidation: blob.TransferValidationTypeComputeCRC64(), + // If Progress is non-nil, this function is called periodically as bytes are uploaded. + Progress: func(bytesTransferred int64) { + if bytesTransferred <= 0 || bytesTransferred > int64(fileSize) { + errTransferred = fmt.Errorf("invalid bytes transferred %d", bytesTransferred) + } + }, + }) + assert.NoError(t, errTransferred) + _require.NoError(err) + + // UploadFile does not accept user generated checksum, should return UnsupportedChecksum + _, err = client.UploadFile(context.Background(), containerName, blobName, file, + &blockblob.UploadFileOptions{ + BlockSize: int64(blockSize), + Concurrency: uint16(concurrency), + TransactionalValidation: blob.TransferValidationTypeCRC64(crc64Value), + // If Progress is non-nil, this function is called periodically as bytes are uploaded. + Progress: func(bytesTransferred int64) { + if bytesTransferred <= 0 || bytesTransferred > int64(fileSize) { + errTransferred = fmt.Errorf("invalid bytes transferred %d", bytesTransferred) + } + }, + }) + _require.NotNil(err) + _require.Error(err, bloberror.UnsupportedChecksum) + + md5Value := md5.Sum(fileData) + contentMD5 := md5Value[:] + + // UploadFile does not accept user generated checksum, should return UnsupportedChecksum + _, err = client.UploadFile(context.Background(), containerName, blobName, file, + &blockblob.UploadFileOptions{ + BlockSize: int64(blockSize), + Concurrency: uint16(concurrency), + TransactionalValidation: blob.TransferValidationTypeMD5(contentMD5), + // If Progress is non-nil, this function is called periodically as bytes are uploaded. + Progress: func(bytesTransferred int64) { + if bytesTransferred <= 0 || bytesTransferred > int64(fileSize) { + errTransferred = fmt.Errorf("invalid bytes transferred %d", bytesTransferred) + } + }, + }) + _require.NotNil(err) + _require.Error(err, bloberror.UnsupportedChecksum) +} + +func (s *AZBlobUnrecordedTestsSuite) TestUploadFileInChunksChecksum() { + fileSize := 8 * 1024 + blockSize := 1024 + concurrency := 3 + _require := require.New(s.T()) + testName := s.T().Name() + performUploadAndDownloadFileTestWithChecksums(s.T(), _require, testName, fileSize, blockSize, concurrency, 0, 0) +} + func performUploadAndDownloadFileTest(t *testing.T, _require *require.Assertions, testName string, fileSize, blockSize, concurrency, downloadOffset, downloadCount int) { // Set up file to upload fileName := "BigFile.bin" diff --git a/sdk/storage/azblob/internal/generated/models.go b/sdk/storage/azblob/internal/generated/models.go index 2bfb97ff873b..eee488ddf1b4 100644 --- a/sdk/storage/azblob/internal/generated/models.go +++ b/sdk/storage/azblob/internal/generated/models.go @@ -41,6 +41,14 @@ func (p *PageBlobClientUploadPagesOptions) SetMD5(v []byte) { p.TransactionalContentMD5 = v } +func (b *BlockBlobClientUploadOptions) SetCRC64(v []byte) { + b.TransactionalContentCRC64 = v +} + +func (b *BlockBlobClientUploadOptions) SetMD5(v []byte) { + b.TransactionalContentMD5 = v +} + type SourceContentSetter interface { SetSourceContentCRC64(v []byte) SetSourceContentMD5(v []byte)