diff --git a/sdk/storage/azfile/assets.json b/sdk/storage/azfile/assets.json index d22014681ec8..079ab96fc36d 100644 --- a/sdk/storage/azfile/assets.json +++ b/sdk/storage/azfile/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "go", "TagPrefix": "go/storage/azfile", - "Tag": "go/storage/azfile_413a85db89" + "Tag": "go/storage/azfile_f16684c6c5" } diff --git a/sdk/storage/azfile/file/chunkwriting.go b/sdk/storage/azfile/file/chunkwriting.go new file mode 100644 index 000000000000..21070c19bcad --- /dev/null +++ b/sdk/storage/azfile/file/chunkwriting.go @@ -0,0 +1,189 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package file + +import ( + "bytes" + "context" + "errors" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" + "io" + "sync" +) + +// chunkWriter provides methods to upload chunks that represent a file to a server. +// This allows us to provide a local implementation that fakes the server for hermetic testing. +type chunkWriter interface { + UploadRange(context.Context, int64, io.ReadSeekCloser, *UploadRangeOptions) (UploadRangeResponse, error) +} + +// bufferManager provides an abstraction for the management of buffers. +// this is mostly for testing purposes, but does allow for different implementations without changing the algorithm. +type bufferManager[T ~[]byte] interface { + // Acquire returns the channel that contains the pool of buffers. + Acquire() <-chan T + + // Release releases the buffer back to the pool for reuse/cleanup. + Release(T) + + // Grow grows the number of buffers, up to the predefined max. + // It returns the total number of buffers or an error. + // No error is returned if the number of buffers has reached max. + // This is called only from the reading goroutine. + Grow() (int, error) + + // Free cleans up all buffers. + Free() +} + +// copyFromReader copies a source io.Reader to file storage using concurrent uploads. +func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst chunkWriter, options UploadStreamOptions, getBufferManager func(maxBuffers int, bufferSize int64) bufferManager[T]) error { + options.setDefaults() + + wg := sync.WaitGroup{} // Used to know when all outgoing chunks have finished processing + errCh := make(chan error, 1) // contains the first error encountered during processing + var err error + + buffers := getBufferManager(options.Concurrency, options.ChunkSize) + defer buffers.Free() + + // this controls the lifetime of the uploading goroutines. + // if an error is encountered, cancel() is called which will terminate all uploads. + // NOTE: the ordering is important here. cancel MUST execute before + // cleaning up the buffers so that any uploading goroutines exit first, + // releasing their buffers back to the pool for cleanup. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // This goroutine grabs a buffer, reads from the stream into the buffer, + // then creates a goroutine to upload/stage the chunk. + for chunkNum := uint32(0); true; chunkNum++ { + var buffer T + select { + case buffer = <-buffers.Acquire(): + // got a buffer + default: + // no buffer available; allocate a new buffer if possible + if _, err := buffers.Grow(); err != nil { + return err + } + + // either grab the newly allocated buffer or wait for one to become available + buffer = <-buffers.Acquire() + } + + var n int + n, err = io.ReadFull(src, buffer) + + if n > 0 { + // some data was read, upload it + wg.Add(1) // We're posting a buffer to be sent + + // NOTE: we must pass chunkNum as an arg to our goroutine else + // it's captured by reference and can change underneath us! + go func(chunkNum uint32) { + // Upload the outgoing chunk, matching the number of bytes read + offset := int64(chunkNum) * options.ChunkSize + uploadRangeOptions := options.getUploadRangeOptions() + _, err := dst.UploadRange(ctx, offset, streaming.NopCloser(bytes.NewReader(buffer[:n])), uploadRangeOptions) + if err != nil { + select { + case errCh <- err: + // error was set + default: + // some other error is already set + } + cancel() + } + buffers.Release(buffer) // The goroutine reading from the stream can reuse this buffer now + + // signal that the chunk has been staged. + // we MUST do this after attempting to write to errCh + // to avoid it racing with the reading goroutine. + wg.Done() + }(chunkNum) + } else { + // nothing was read so the buffer is empty, send it back for reuse/clean-up. + buffers.Release(buffer) + } + + if err != nil { // The reader is done, no more outgoing buffers + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + // these are expected errors, we don't surface those + err = nil + } else { + // some other error happened, terminate any outstanding uploads + cancel() + } + break + } + } + + wg.Wait() // Wait for all outgoing chunks to complete + + if err != nil { + // there was an error reading from src, favor this error over any error during staging + return err + } + + select { + case err = <-errCh: + // there was an error during staging + return err + default: + // no error was encountered + } + + // All chunks uploaded, return nil error + return nil +} + +// mmbPool implements the bufferManager interface. +// it uses anonymous memory mapped files for buffers. +// don't use this type directly, use newMMBPool() instead. +type mmbPool struct { + buffers chan mmb + count int + max int + size int64 +} + +func newMMBPool(maxBuffers int, bufferSize int64) bufferManager[mmb] { + return &mmbPool{ + buffers: make(chan mmb, maxBuffers), + max: maxBuffers, + size: bufferSize, + } +} + +func (pool *mmbPool) Acquire() <-chan mmb { + return pool.buffers +} + +func (pool *mmbPool) Grow() (int, error) { + if pool.count < pool.max { + buffer, err := newMMB(pool.size) + if err != nil { + return 0, err + } + pool.buffers <- buffer + pool.count++ + } + return pool.count, nil +} + +func (pool *mmbPool) Release(buffer mmb) { + pool.buffers <- buffer +} + +func (pool *mmbPool) Free() { + for i := 0; i < pool.count; i++ { + buffer := <-pool.buffers + buffer.delete() + } + pool.count = 0 +} diff --git a/sdk/storage/azfile/file/client.go b/sdk/storage/azfile/file/client.go index 8546bb5a5763..6b060d589da2 100644 --- a/sdk/storage/azfile/file/client.go +++ b/sdk/storage/azfile/file/client.go @@ -7,9 +7,12 @@ package file import ( + "bytes" "context" + "errors" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/base" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/exported" @@ -19,6 +22,7 @@ import ( "io" "os" "strings" + "sync" "time" ) @@ -158,23 +162,6 @@ func (f *Client) AbortCopy(ctx context.Context, copyID string, options *AbortCop return resp, err } -// DownloadStream operation reads or downloads a file from the system, including its metadata and properties. -// For more information, see https://learn.microsoft.com/en-us/rest/api/storageservices/get-file. -func (f *Client) DownloadStream(ctx context.Context, options *DownloadStreamOptions) (DownloadStreamResponse, error) { - return DownloadStreamResponse{}, nil -} - -// DownloadBuffer downloads an Azure file to a buffer with parallel. -func (f *Client) DownloadBuffer(ctx context.Context, buffer []byte, o *DownloadBufferOptions) (int64, error) { - return 0, nil -} - -// DownloadFile downloads an Azure file to a local file. -// The file would be truncated if the size doesn't match. -func (f *Client) DownloadFile(ctx context.Context, file *os.File, o *DownloadFileOptions) (int64, error) { - return 0, nil -} - // Resize operation resizes the file to the specified size. // For more information, see https://learn.microsoft.com/en-us/rest/api/storageservices/set-file-properties. func (f *Client) Resize(ctx context.Context, size int64, options *ResizeOptions) (ResizeResponse, error) { @@ -184,12 +171,18 @@ func (f *Client) Resize(ctx context.Context, size int64, options *ResizeOptions) } // UploadRange operation uploads a range of bytes to a file. -// - contentRange: Specifies the range of bytes to be written. +// - offset: Specifies the start byte at which the range of bytes is to be written. // - body: Specifies the data to be uploaded. // // For more information, see https://learn.microsoft.com/en-us/rest/api/storageservices/put-range. -func (f *Client) UploadRange(ctx context.Context, contentRange HTTPRange, body io.ReadSeekCloser, options *UploadRangeOptions) (UploadRangeResponse, error) { - return UploadRangeResponse{}, nil +func (f *Client) UploadRange(ctx context.Context, offset int64, body io.ReadSeekCloser, options *UploadRangeOptions) (UploadRangeResponse, error) { + rangeParam, contentLength, uploadRangeOptions, leaseAccessConditions, err := options.format(offset, body) + if err != nil { + return UploadRangeResponse{}, err + } + + resp, err := f.generated().UploadRange(ctx, rangeParam, RangeWriteTypeUpdate, contentLength, body, uploadRangeOptions, leaseAccessConditions) + return resp, err } // ClearRange operation clears the specified range and releases the space used in storage for that range. @@ -197,7 +190,13 @@ func (f *Client) UploadRange(ctx context.Context, contentRange HTTPRange, body i // // For more information, see https://learn.microsoft.com/en-us/rest/api/storageservices/put-range. func (f *Client) ClearRange(ctx context.Context, contentRange HTTPRange, options *ClearRangeOptions) (ClearRangeResponse, error) { - return ClearRangeResponse{}, nil + rangeParam, leaseAccessConditions, err := options.format(contentRange) + if err != nil { + return ClearRangeResponse{}, err + } + + resp, err := f.generated().UploadRange(ctx, rangeParam, RangeWriteTypeClear, 0, nil, nil, leaseAccessConditions) + return resp, err } // UploadRangeFromURL operation uploads a range of bytes to a file where the contents are read from a URL. @@ -206,8 +205,14 @@ func (f *Client) ClearRange(ctx context.Context, contentRange HTTPRange, options // - sourceRange: Bytes of source data in the specified range. // // For more information, see https://learn.microsoft.com/en-us/rest/api/storageservices/put-range-from-url. -func (f *Client) UploadRangeFromURL(ctx context.Context, copySource string, destinationRange HTTPRange, sourceRange HTTPRange, options *UploadRangeFromURLOptions) (UploadRangeFromURLResponse, error) { - return UploadRangeFromURLResponse{}, nil +func (f *Client) UploadRangeFromURL(ctx context.Context, copySource string, sourceOffset int64, destinationOffset int64, count int64, options *UploadRangeFromURLOptions) (UploadRangeFromURLResponse, error) { + destRange, opts, sourceModifiedAccessConditions, leaseAccessConditions, err := options.format(sourceOffset, destinationOffset, count) + if err != nil { + return UploadRangeFromURLResponse{}, err + } + + resp, err := f.generated().UploadRangeFromURL(ctx, destRange, copySource, 0, opts, sourceModifiedAccessConditions, leaseAccessConditions) + return resp, err } // GetRangeList operation returns the list of valid ranges for a file. @@ -266,3 +271,118 @@ func (f *Client) GetSASURL(permissions sas.FilePermissions, expiry time.Time, o return endpoint, nil } + +// Concurrent Upload Functions ----------------------------------------------------------------------------------------- + +// uploadFromReader uploads a buffer in chunks to an Azure file. +func (f *Client) uploadFromReader(ctx context.Context, reader io.ReaderAt, actualSize int64, o *uploadFromReaderOptions) error { + if actualSize > MaxFileSize { + return errors.New("buffer is too large to upload to a file") + } + if o.ChunkSize == 0 { + o.ChunkSize = MaxUpdateRangeBytes + } + + // TODO: Add logs + + progress := int64(0) + progressLock := &sync.Mutex{} + + err := shared.DoBatchTransfer(ctx, &shared.BatchTransferOptions{ + OperationName: "uploadFromReader", + TransferSize: actualSize, + ChunkSize: o.ChunkSize, + Concurrency: o.Concurrency, + Operation: func(ctx context.Context, offset int64, chunkSize int64) error { + // This function is called once per file range. + // It is passed this file's offset within the buffer and its count of bytes + // Prepare to read the proper range/section of the buffer + if chunkSize < o.ChunkSize { + // this is the last file range. Its actual size might be less + // than the calculated size due to rounding up of the payload + // size to fit in a whole number of chunks. + chunkSize = actualSize - offset + } + var body io.ReadSeeker = io.NewSectionReader(reader, offset, chunkSize) + if o.Progress != nil { + chunkProgress := int64(0) + body = streaming.NewRequestProgress(streaming.NopCloser(body), + func(bytesTransferred int64) { + diff := bytesTransferred - chunkProgress + chunkProgress = bytesTransferred + progressLock.Lock() // 1 goroutine at a time gets progress report + progress += diff + o.Progress(progress) + progressLock.Unlock() + }) + } + + uploadRangeOptions := o.getUploadRangeOptions() + _, err := f.UploadRange(ctx, offset, streaming.NopCloser(body), uploadRangeOptions) + return err + }, + }) + return err +} + +// UploadBuffer uploads a buffer in chunks to an Azure file. +func (f *Client) UploadBuffer(ctx context.Context, buffer []byte, options *UploadBufferOptions) error { + uploadOptions := uploadFromReaderOptions{} + if options != nil { + uploadOptions = *options + } + return f.uploadFromReader(ctx, bytes.NewReader(buffer), int64(len(buffer)), &uploadOptions) +} + +// UploadFile uploads a file in chunks to an Azure file. +func (f *Client) UploadFile(ctx context.Context, file *os.File, options *UploadFileOptions) error { + stat, err := file.Stat() + if err != nil { + return err + } + uploadOptions := uploadFromReaderOptions{} + if options != nil { + uploadOptions = *options + } + return f.uploadFromReader(ctx, file, stat.Size(), &uploadOptions) +} + +// UploadStream copies the file held in io.Reader to the file at fileClient. +// A Context deadline or cancellation will cause this to error. +func (f *Client) UploadStream(ctx context.Context, body io.Reader, options *UploadStreamOptions) error { + if options == nil { + options = &UploadStreamOptions{} + } + + err := copyFromReader(ctx, body, f, *options, newMMBPool) + return err +} + +// Concurrent Download Functions ----------------------------------------------------------------------------------------- + +// DownloadStream operation reads or downloads a file from the system, including its metadata and properties. +// For more information, see https://learn.microsoft.com/en-us/rest/api/storageservices/get-file. +func (f *Client) DownloadStream(ctx context.Context, options *DownloadStreamOptions) (DownloadStreamResponse, error) { + opts, leaseAccessConditions := options.format() + if options == nil { + options = &DownloadStreamOptions{} + } + + resp, err := f.generated().Download(ctx, opts, leaseAccessConditions) + return DownloadStreamResponse{ + DownloadResponse: resp, + client: f, + getInfo: httpGetterInfo{Range: options.Range, ETag: resp.ETag}, + }, err +} + +// DownloadBuffer downloads an Azure file to a buffer with parallel. +func (f *Client) DownloadBuffer(ctx context.Context, buffer []byte, o *DownloadBufferOptions) (int64, error) { + return 0, nil +} + +// DownloadFile downloads an Azure file to a local file. +// The file would be truncated if the size doesn't match. +func (f *Client) DownloadFile(ctx context.Context, file *os.File, o *DownloadFileOptions) (int64, error) { + return 0, nil +} diff --git a/sdk/storage/azfile/file/client_test.go b/sdk/storage/azfile/file/client_test.go index 9a61a5f696b8..28735c7b63a7 100644 --- a/sdk/storage/azfile/file/client_test.go +++ b/sdk/storage/azfile/file/client_test.go @@ -7,18 +7,28 @@ package file_test import ( + "bytes" "context" + "crypto/md5" + "crypto/rand" + "encoding/binary" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/internal/recording" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/shared" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/testcommon" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "hash/crc64" + "io" + "io/ioutil" + "os" "strings" "testing" "time" @@ -375,8 +385,7 @@ func (f *FileUnrecordedTestsSuite) TestFileGetSetPropertiesNonDefault() { _require.NoError(err) md5Str := "MDAwMDAwMDA=" - var testMd5 []byte - copy(testMd5[:], md5Str) + testMd5 := []byte(md5Str) creationTime := time.Now().Add(-time.Hour) lastWriteTime := time.Now().Add(-time.Minute * 15) @@ -460,8 +469,7 @@ func (f *FileRecordedTestsSuite) TestFilePreservePermissions() { attribs := getResp.FileAttributes md5Str := "MDAwMDAwMDA=" - var testMd5 []byte - copy(testMd5[:], md5Str) + testMd5 := []byte(md5Str) properties := file.SetHTTPHeadersOptions{ HTTPHeaders: &file.HTTPHeaders{ @@ -529,8 +537,7 @@ func (f *FileRecordedTestsSuite) TestFileGetSetPropertiesSnapshot() { _require.NoError(err) md5Str := "MDAwMDAwMDA=" - var testMd5 []byte - copy(testMd5[:], md5Str) + testMd5 := []byte(md5Str) fileSetHTTPHeadersOptions := file.SetHTTPHeadersOptions{ HTTPHeaders: &file.HTTPHeaders{ @@ -734,7 +741,6 @@ func (f *FileRecordedTestsSuite) TestFileStartCopyMetadata() { "Foo": to.Ptr("Foovalue"), "Bar": to.Ptr("Barvalue"), } - _, err = copyFClient.StartCopyFromURL(context.Background(), fClient.URL(), &file.StartCopyFromURLOptions{Metadata: basicMetadata}) _require.NoError(err) @@ -1412,7 +1418,357 @@ func (f *FileRecordedTestsSuite) TestSASFileClientSignNegative() { _require.Equal(err.Error(), "service SAS is missing at least one of these: ExpiryTime or Permissions") } -// TODO: Add tests for different options of StartCopyFromURL() +func (f *FileUnrecordedTestsSuite) TestFileUploadClearListRange() { + _require := require.New(f.T()) + testName := f.T().Name() + + svcClient, err := testcommon.GetServiceClient(f.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + shareClient := testcommon.CreateNewShare(context.Background(), _require, testcommon.GenerateShareName(testName), svcClient) + defer testcommon.DeleteShare(context.Background(), _require, shareClient) + + var fileSize int64 = 1024 * 1024 * 10 + fClient := shareClient.NewRootDirectoryClient().NewFileClient(testcommon.GenerateFileName(testName)) + _, err = fClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + gResp, err := fClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp.ContentLength, fileSize) + + contentSize := 1024 * 8 // 8KB + content := make([]byte, contentSize) + body := bytes.NewReader(content) + rsc := streaming.NopCloser(body) + md5Value := md5.Sum(content) + contentMD5 := md5Value[:] + + uResp, err := fClient.UploadRange(context.Background(), 0, rsc, &file.UploadRangeOptions{ + TransactionalValidation: file.TransferValidationTypeMD5(contentMD5), + }) + _require.NoError(err) + _require.NotNil(uResp.ContentMD5) + _require.EqualValues(uResp.ContentMD5, contentMD5) + + rangeList, err := fClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList.RequestID) + + cResp, err := fClient.ClearRange(context.Background(), file.HTTPRange{Offset: 0, Count: int64(contentSize)}, nil) + _require.NoError(err) + _require.Nil(cResp.ContentMD5) + + rangeList2, err := fClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList2.RequestID) +} + +func (f *FileUnrecordedTestsSuite) TestFileUploadRangeFromURL() { + _require := require.New(f.T()) + testName := f.T().Name() + + cred, err := testcommon.GetGenericSharedKeyCredential(testcommon.TestAccountDefault) + _require.NoError(err) + + svcClient, err := testcommon.GetServiceClient(f.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + shareName := testcommon.GenerateShareName(testName) + shareClient := testcommon.CreateNewShare(context.Background(), _require, shareName, svcClient) + defer testcommon.DeleteShare(context.Background(), _require, shareClient) + + var fileSize int64 = 1024 * 20 + srcFileName := "src" + testcommon.GenerateFileName(testName) + srcFClient := shareClient.NewRootDirectoryClient().NewFileClient(srcFileName) + _, err = srcFClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + gResp, err := srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp.ContentLength, fileSize) + + contentSize := 1024 * 8 // 8KB + content := make([]byte, contentSize) + body := bytes.NewReader(content) + rsc := streaming.NopCloser(body) + contentCRC64 := crc64.Checksum(content, shared.CRC64Table) + + _, err = srcFClient.UploadRange(context.Background(), 0, rsc, nil) + _require.NoError(err) + + perms := sas.FilePermissions{Read: true, Write: true} + sasQueryParams, err := sas.SignatureValues{ + Protocol: sas.ProtocolHTTPS, // Users MUST use HTTPS (not HTTP) + ExpiryTime: time.Now().UTC().Add(48 * time.Hour), // 48-hours before expiration + ShareName: shareName, + DirectoryOrFilePath: srcFileName, + Permissions: perms.String(), + }.SignWithSharedKey(cred) + _require.NoError(err) + + srcFileSAS := srcFClient.URL() + "?" + sasQueryParams.Encode() + + destFClient := shareClient.NewRootDirectoryClient().NewFileClient("dest" + testcommon.GenerateFileName(testName)) + _, err = destFClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + uResp, err := destFClient.UploadRangeFromURL(context.Background(), srcFileSAS, 0, 0, int64(contentSize), &file.UploadRangeFromURLOptions{ + SourceContentCRC64: contentCRC64, + }) + _require.NoError(err) + _require.NotNil(uResp.XMSContentCRC64) + _require.EqualValues(binary.LittleEndian.Uint64(uResp.XMSContentCRC64), contentCRC64) + + rangeList, err := destFClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList.RequestID) + + cResp, err := destFClient.ClearRange(context.Background(), file.HTTPRange{Offset: 0, Count: int64(contentSize)}, nil) + _require.NoError(err) + _require.Nil(cResp.ContentMD5) + + rangeList2, err := destFClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList2.RequestID) +} + +// TODO: check why this is failing +/*func (f *FileRecordedTestsSuite) TestFileUploadRangeFromURLCopySourceAuth() { + _require := require.New(f.T()) + testName := f.T().Name() + + svcClient, err := testcommon.GetServiceClient(f.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + shareName := testcommon.GenerateShareName(testName) + shareClient := testcommon.CreateNewShare(context.Background(), _require, shareName, svcClient) + defer testcommon.DeleteShare(context.Background(), _require, shareClient) + + var fileSize int64 = 1024 * 20 + srcFileName := "src" + testcommon.GenerateFileName(testName) + srcFClient := shareClient.NewRootDirectoryClient().NewFileClient(srcFileName) + _, err = srcFClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + gResp, err := srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp.ContentLength, fileSize) + + contentSize := 1024 * 8 // 8KB + content := make([]byte, contentSize) + body := bytes.NewReader(content) + rsc := streaming.NopCloser(body) + contentCRC64 := crc64.Checksum(content, shared.CRC64Table) + + _, err = srcFClient.UploadRange(context.Background(), 0, rsc, nil) + _require.NoError(err) + + destFClient := shareClient.NewRootDirectoryClient().NewFileClient("dest" + testcommon.GenerateFileName(testName)) + _, err = destFClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + cred, err := azidentity.NewDefaultAzureCredential(nil) + _require.NoError(err) + + // Getting token + token, err := cred.GetToken(context.Background(), policy.TokenRequestOptions{Scopes: []string{"https://storage.azure.com/.default"}}) + _require.NoError(err) + + uResp, err := destFClient.UploadRangeFromURL(context.Background(), srcFClient.URL(), 0, 0, int64(contentSize), &file.UploadRangeFromURLOptions{ + SourceContentCRC64: contentCRC64, + CopySourceAuthorization: to.Ptr("Bearer " + token.Token), + }) + _require.NoError(err) + _require.NotNil(uResp.XMSContentCRC64) + _require.EqualValues(binary.LittleEndian.Uint64(uResp.XMSContentCRC64), contentCRC64) + + rangeList, err := destFClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList.RequestID) + + cResp, err := destFClient.ClearRange(context.Background(), file.HTTPRange{Offset: 0, Count: int64(contentSize)}, nil) + _require.NoError(err) + _require.Nil(cResp.ContentMD5) + + rangeList2, err := destFClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList2.RequestID) +}*/ + +func (f *FileUnrecordedTestsSuite) TestFileUploadBuffer() { + _require := require.New(f.T()) + testName := f.T().Name() + + svcClient, err := testcommon.GetServiceClient(f.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + shareClient := testcommon.CreateNewShare(context.Background(), _require, testcommon.GenerateShareName(testName), svcClient) + defer testcommon.DeleteShare(context.Background(), _require, shareClient) + + var fileSize int64 = 100 * 1024 * 1024 + fClient := shareClient.NewRootDirectoryClient().NewFileClient(testcommon.GenerateFileName(testName)) + _, err = fClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + gResp, err := fClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp.ContentLength, fileSize) + + content := make([]byte, fileSize) + _, err = rand.Read(content) + _require.NoError(err) + md5Value := md5.Sum(content) + contentMD5 := md5Value[:] + + err = fClient.UploadBuffer(context.Background(), content, &file.UploadBufferOptions{ + Concurrency: 5, + ChunkSize: 4 * 1024 * 1024, + }) + _require.NoError(err) + + dResp, err := fClient.DownloadStream(context.Background(), nil) + _require.NoError(err) + + data, err := io.ReadAll(dResp.Body) + _require.NoError(err) + + downloadedMD5Value := md5.Sum(data) + downloadedContentMD5 := downloadedMD5Value[:] + + _require.EqualValues(downloadedContentMD5, contentMD5) + + gResp2, err := fClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp2.ContentLength, fileSize) + + rangeList, err := fClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList.RequestID) +} + +func (f *FileUnrecordedTestsSuite) TestFileUploadFile() { + _require := require.New(f.T()) + testName := f.T().Name() + + svcClient, err := testcommon.GetServiceClient(f.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + shareClient := testcommon.CreateNewShare(context.Background(), _require, testcommon.GenerateShareName(testName), svcClient) + defer testcommon.DeleteShare(context.Background(), _require, shareClient) + + var fileSize int64 = 200 * 1024 * 1024 + fClient := shareClient.NewRootDirectoryClient().NewFileClient(testcommon.GenerateFileName(testName)) + _, err = fClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + gResp, err := fClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp.ContentLength, fileSize) + + // create local file + content := make([]byte, fileSize) + _, err = rand.Read(content) + _require.NoError(err) + err = ioutil.WriteFile("testFile", content, 0644) + _require.NoError(err) + + defer func() { + err = os.Remove("testFile") + _require.NoError(err) + }() + + fh, err := os.Open("testFile") + _require.NoError(err) + + defer func(fh *os.File) { + err := fh.Close() + _require.NoError(err) + }(fh) + + hash := md5.New() + _, err = io.Copy(hash, fh) + _require.NoError(err) + contentMD5 := hash.Sum(nil) + + err = fClient.UploadFile(context.Background(), fh, &file.UploadFileOptions{ + Concurrency: 5, + ChunkSize: 4 * 1024 * 1024, + }) + _require.NoError(err) + + dResp, err := fClient.DownloadStream(context.Background(), nil) + _require.NoError(err) + + data, err := io.ReadAll(dResp.Body) + _require.NoError(err) + + downloadedMD5Value := md5.Sum(data) + downloadedContentMD5 := downloadedMD5Value[:] + + _require.EqualValues(downloadedContentMD5, contentMD5) + + gResp2, err := fClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp2.ContentLength, fileSize) + + rangeList, err := fClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList.RequestID) +} + +func (f *FileUnrecordedTestsSuite) TestFileUploadStream() { + _require := require.New(f.T()) + testName := f.T().Name() + + svcClient, err := testcommon.GetServiceClient(f.T(), testcommon.TestAccountDefault, nil) + _require.NoError(err) + + shareClient := testcommon.CreateNewShare(context.Background(), _require, testcommon.GenerateShareName(testName), svcClient) + defer testcommon.DeleteShare(context.Background(), _require, shareClient) + + var fileSize int64 = 100 * 1024 * 1024 + fClient := shareClient.NewRootDirectoryClient().NewFileClient(testcommon.GenerateFileName(testName)) + _, err = fClient.Create(context.Background(), fileSize, nil) + _require.NoError(err) + + gResp, err := fClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp.ContentLength, fileSize) + + content := make([]byte, fileSize) + _, err = rand.Read(content) + _require.NoError(err) + md5Value := md5.Sum(content) + contentMD5 := md5Value[:] + + err = fClient.UploadStream(context.Background(), streaming.NopCloser(bytes.NewReader(content)), &file.UploadStreamOptions{ + Concurrency: 5, + ChunkSize: 4 * 1024 * 1024, + }) + _require.NoError(err) + + dResp, err := fClient.DownloadStream(context.Background(), nil) + _require.NoError(err) + + data, err := io.ReadAll(dResp.Body) + _require.NoError(err) + + downloadedMD5Value := md5.Sum(data) + downloadedContentMD5 := downloadedMD5Value[:] + + _require.EqualValues(downloadedContentMD5, contentMD5) + + gResp2, err := fClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(*gResp2.ContentLength, fileSize) + + rangeList, err := fClient.GetRangeList(context.Background(), nil) + _require.NoError(err) + _require.NotNil(rangeList.RequestID) +} + +// TODO: Content validation in StartCopyFromURL() after adding upload and download methods. // TODO: Add tests for upload and download methods diff --git a/sdk/storage/azfile/file/constants.go b/sdk/storage/azfile/file/constants.go index 6714af1dc69c..0498935ed4c3 100644 --- a/sdk/storage/azfile/file/constants.go +++ b/sdk/storage/azfile/file/constants.go @@ -6,7 +6,20 @@ package file -import "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/generated" +import ( + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/exported" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/generated" +) + +const ( + _1MiB = 1024 * 1024 + + // MaxUpdateRangeBytes indicates the maximum number of bytes that can be updated in a call to Client.UploadRange. + MaxUpdateRangeBytes = 4 * 1024 * 1024 // 4MiB + + // MaxFileSize indicates the maximum size of the file allowed. + MaxFileSize = 4 * 1024 * 1024 * 1024 * 1024 // 4 TiB +) // CopyStatusType defines the states of the copy operation. type CopyStatusType = generated.CopyStatusType @@ -53,3 +66,9 @@ const ( func PossibleRangeWriteTypeValues() []RangeWriteType { return generated.PossibleFileRangeWriteTypeValues() } + +// TransferValidationType abstracts the various mechanisms used to verify a transfer. +type TransferValidationType = exported.TransferValidationType + +// TransferValidationTypeMD5 is a TransferValidationType used to provide a precomputed MD5. +type TransferValidationTypeMD5 = exported.TransferValidationTypeMD5 diff --git a/sdk/storage/azfile/file/mmf_linux.go b/sdk/storage/azfile/file/mmf_linux.go new file mode 100644 index 000000000000..93d718c8487f --- /dev/null +++ b/sdk/storage/azfile/file/mmf_linux.go @@ -0,0 +1,35 @@ +//go:build go1.18 && (linux || darwin || freebsd || openbsd || netbsd || solaris) +// +build go1.18 +// +build linux darwin freebsd openbsd netbsd solaris + +package file + +import ( + "fmt" + "os" + "syscall" +) + +// mmb is a memory mapped buffer +type mmb []byte + +// newMMB creates a new memory mapped buffer with the specified size +func newMMB(size int64) (mmb, error) { + prot, flags := syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE + addr, err := syscall.Mmap(-1, 0, int(size), prot, flags) + if err != nil { + return nil, os.NewSyscallError("Mmap", err) + } + return mmb(addr), nil +} + +// delete cleans up the memory mapped buffer +func (m *mmb) delete() { + err := syscall.Munmap(*m) + *m = nil + if err != nil { + // if we get here, there is likely memory corruption. + // please open an issue https://github.com/Azure/azure-sdk-for-go/issues + panic(fmt.Sprintf("Munmap error: %v", err)) + } +} diff --git a/sdk/storage/azfile/file/mmf_windows.go b/sdk/storage/azfile/file/mmf_windows.go new file mode 100644 index 000000000000..4ae12b639842 --- /dev/null +++ b/sdk/storage/azfile/file/mmf_windows.go @@ -0,0 +1,54 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package file + +import ( + "fmt" + "os" + "reflect" + "syscall" + "unsafe" +) + +// mmb is a memory mapped buffer +type mmb []byte + +// newMMB creates a new memory mapped buffer with the specified size +func newMMB(size int64) (mmb, error) { + const InvalidHandleValue = ^uintptr(0) // -1 + + prot, access := uint32(syscall.PAGE_READWRITE), uint32(syscall.FILE_MAP_WRITE) + hMMF, err := syscall.CreateFileMapping(syscall.Handle(InvalidHandleValue), nil, prot, uint32(size>>32), uint32(size&0xffffffff), nil) + if err != nil { + return nil, os.NewSyscallError("CreateFileMapping", err) + } + defer syscall.CloseHandle(hMMF) + + addr, err := syscall.MapViewOfFile(hMMF, access, 0, 0, uintptr(size)) + if err != nil { + return nil, os.NewSyscallError("MapViewOfFile", err) + } + + m := mmb{} + h := (*reflect.SliceHeader)(unsafe.Pointer(&m)) + h.Data = addr + h.Len = int(size) + h.Cap = h.Len + return m, nil +} + +// delete cleans up the memory mapped buffer +func (m *mmb) delete() { + addr := uintptr(unsafe.Pointer(&(([]byte)(*m)[0]))) + *m = mmb{} + err := syscall.UnmapViewOfFile(addr) + if err != nil { + // if we get here, there is likely memory corruption. + // please open an issue https://github.com/Azure/azure-sdk-for-go/issues + panic(fmt.Sprintf("UnmapViewOfFile error: %v", err)) + } +} diff --git a/sdk/storage/azfile/file/models.go b/sdk/storage/azfile/file/models.go index b0417b617d40..11f36beda260 100644 --- a/sdk/storage/azfile/file/models.go +++ b/sdk/storage/azfile/file/models.go @@ -7,10 +7,13 @@ package file import ( + "encoding/binary" + "errors" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/exported" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/generated" "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/shared" + "io" "time" ) @@ -324,6 +327,16 @@ type DownloadStreamOptions struct { LeaseAccessConditions *LeaseAccessConditions } +func (o *DownloadStreamOptions) format() (*generated.FileClientDownloadOptions, *LeaseAccessConditions) { + if o == nil { + return nil, nil + } + return &generated.FileClientDownloadOptions{ + Range: exported.FormatHTTPRange(o.Range), + RangeGetContentMD5: o.RangeGetContentMD5, + }, o.LeaseAccessConditions +} + // --------------------------------------------------------------------------------------------------------------------- // DownloadBufferOptions contains the optional parameters for the Client.DownloadBuffer method. @@ -335,7 +348,7 @@ type DownloadBufferOptions struct { // range, as long as the range is less than or equal to 4 MB in size. RangeGetContentMD5 *bool - // ChunkSize specifies the block size to use for each parallel download; the default size is 4MB. + // ChunkSize specifies the chunk size to use for each parallel download; the default size is 4MB. ChunkSize int64 // Progress is a function that is invoked periodically as bytes are received. @@ -344,11 +357,11 @@ type DownloadBufferOptions struct { // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions - // Concurrency indicates the maximum number of blocks to download in parallel (0=default). + // Concurrency indicates the maximum number of chunks to download in parallel (0=default). Concurrency uint16 - // RetryReaderOptionsPerBlock is used when downloading each block. - RetryReaderOptionsPerBlock RetryReaderOptions + // RetryReaderOptionsPerRange is used when downloading each chunk. + RetryReaderOptionsPerRange RetryReaderOptions } // --------------------------------------------------------------------------------------------------------------------- @@ -362,7 +375,7 @@ type DownloadFileOptions struct { // range, as long as the range is less than or equal to 4 MB in size. RangeGetContentMD5 *bool - // ChunkSize specifies the block size to use for each parallel download; the default size is 4MB. + // ChunkSize specifies the chunk size to use for each parallel download; the default size is 4MB. ChunkSize int64 // Progress is a function that is invoked periodically as bytes are received. @@ -371,11 +384,11 @@ type DownloadFileOptions struct { // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions - // Concurrency indicates the maximum number of blocks to download in parallel (0=default). + // Concurrency indicates the maximum number of chunks to download in parallel (0=default). Concurrency uint16 - // RetryReaderOptionsPerBlock is used when downloading each block. - RetryReaderOptionsPerBlock RetryReaderOptions + // RetryReaderOptionsPerRange is used when downloading each chunk. + RetryReaderOptionsPerRange RetryReaderOptions } // --------------------------------------------------------------------------------------------------------------------- @@ -406,26 +419,73 @@ func (o *ResizeOptions) format(contentLength int64) (fileAttributes string, file // UploadRangeOptions contains the optional parameters for the Client.UploadRange method. type UploadRangeOptions struct { - // An MD5 hash of the content. This hash is used to verify the integrity of the data during transport. When the Content-MD5 - // header is specified, the File service compares the hash of the content that has - // arrived with the header value that was sent. If the two hashes do not match, the operation will fail with error code 400 (Bad Request). - ContentMD5 []byte + // TransactionalValidation specifies the transfer validation type to use. + // The default is nil (no transfer validation). + TransactionalValidation TransferValidationType // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions } +func (o *UploadRangeOptions) format(offset int64, body io.ReadSeekCloser) (string, int64, *generated.FileClientUploadRangeOptions, *generated.LeaseAccessConditions, error) { + if offset < 0 || body == nil { + return "", 0, nil, nil, errors.New("invalid argument: offset must be >= 0 and body must not be nil") + } + + count, err := shared.ValidateSeekableStreamAt0AndGetCount(body) + if err != nil { + return "", 0, nil, nil, err + } + + if count == 0 { + return "", 0, nil, nil, errors.New("invalid argument: body must contain readable data whose size is > 0") + } + + httpRange := exported.FormatHTTPRange(HTTPRange{ + Offset: offset, + Count: count, + }) + rangeParam := "" + if httpRange != nil { + rangeParam = *httpRange + } + + var leaseAccessConditions *LeaseAccessConditions + uploadRangeOptions := &generated.FileClientUploadRangeOptions{} + + if o != nil { + leaseAccessConditions = o.LeaseAccessConditions + } + if o != nil && o.TransactionalValidation != nil { + body, err = o.TransactionalValidation.Apply(body, uploadRangeOptions) + if err != nil { + return "", 0, nil, nil, err + } + } + + return rangeParam, count, uploadRangeOptions, leaseAccessConditions, nil +} + // --------------------------------------------------------------------------------------------------------------------- // ClearRangeOptions contains the optional parameters for the Client.ClearRange method. type ClearRangeOptions struct { - // An MD5 hash of the content. This hash is used to verify the integrity of the data during transport. When the Content-MD5 - // header is specified, the File service compares the hash of the content that has - // arrived with the header value that was sent. If the two hashes do not match, the operation will fail with error code 400 (Bad Request). - ContentMD5 []byte // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions } +func (o *ClearRangeOptions) format(contentRange HTTPRange) (string, *generated.LeaseAccessConditions, error) { + httpRange := exported.FormatHTTPRange(contentRange) + if httpRange == nil || contentRange.Offset < 0 || contentRange.Count <= 0 { + return "", nil, errors.New("invalid argument: either offset is < 0 or count <= 0") + } + + if o == nil { + return *httpRange, nil, nil + } + + return *httpRange, o.LeaseAccessConditions, nil +} + // --------------------------------------------------------------------------------------------------------------------- // UploadRangeFromURLOptions contains the optional parameters for the Client.UploadRangeFromURL method. @@ -433,11 +493,43 @@ type UploadRangeFromURLOptions struct { // Only Bearer type is supported. Credentials should be a valid OAuth access token to copy source. CopySourceAuthorization *string // Specify the crc64 calculated for the range of bytes that must be read from the copy source. - SourceContentCRC64 []byte + SourceContentCRC64 uint64 SourceModifiedAccessConditions *SourceModifiedAccessConditions LeaseAccessConditions *LeaseAccessConditions } +func (o *UploadRangeFromURLOptions) format(sourceOffset int64, destinationOffset int64, count int64) (string, *generated.FileClientUploadRangeFromURLOptions, *generated.SourceModifiedAccessConditions, *generated.LeaseAccessConditions, error) { + if sourceOffset < 0 || destinationOffset < 0 { + return "", nil, nil, nil, errors.New("invalid argument: source and destination offsets must be >= 0") + } + + httpRangeSrc := exported.FormatHTTPRange(HTTPRange{Offset: sourceOffset, Count: count}) + httpRangeDest := exported.FormatHTTPRange(HTTPRange{Offset: destinationOffset, Count: count}) + destRange := "" + if httpRangeDest != nil { + destRange = *httpRangeDest + } + + opts := &generated.FileClientUploadRangeFromURLOptions{ + SourceRange: httpRangeSrc, + } + + var sourceModifiedAccessConditions *SourceModifiedAccessConditions + var leaseAccessConditions *LeaseAccessConditions + + if o != nil { + opts.CopySourceAuthorization = o.CopySourceAuthorization + sourceModifiedAccessConditions = o.SourceModifiedAccessConditions + leaseAccessConditions = o.LeaseAccessConditions + + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, o.SourceContentCRC64) + opts.SourceContentCRC64 = buf + } + + return destRange, opts, sourceModifiedAccessConditions, leaseAccessConditions, nil +} + // --------------------------------------------------------------------------------------------------------------------- // GetRangeListOptions contains the optional parameters for the Client.GetRangeList method. @@ -539,3 +631,64 @@ func (o *ListHandlesOptions) format() *generated.FileClientListHandlesOptions { // Handle - A listed Azure Storage handle item. type Handle = generated.Handle + +// --------------------------------------------------------------------------------------------------------------------- + +// uploadFromReaderOptions identifies options used by the UploadBuffer and UploadFile functions. +type uploadFromReaderOptions struct { + // ChunkSize specifies the chunk size to use in bytes; the default (and maximum size) is MaxUpdateRangeBytes. + ChunkSize int64 + + // Progress is a function that is invoked periodically as bytes are sent to the FileClient. + // Note that the progress reporting is not always increasing; it can go down when retrying a request. + Progress func(bytesTransferred int64) + + // Concurrency indicates the maximum number of chunks to upload in parallel (default is 5) + Concurrency uint16 + + // LeaseAccessConditions contains optional parameters to access leased entity. + LeaseAccessConditions *LeaseAccessConditions +} + +// UploadBufferOptions provides set of configurations for Client.UploadBuffer operation. +type UploadBufferOptions = uploadFromReaderOptions + +// UploadFileOptions provides set of configurations for Client.UploadFile operation. +type UploadFileOptions = uploadFromReaderOptions + +func (o *uploadFromReaderOptions) getUploadRangeOptions() *UploadRangeOptions { + return &UploadRangeOptions{ + LeaseAccessConditions: o.LeaseAccessConditions, + } +} + +// --------------------------------------------------------------------------------------------------------------------- + +// UploadStreamOptions provides set of configurations for Client.UploadStream operation. +type UploadStreamOptions struct { + // ChunkSize defines the size of the buffer used during upload. The default and minimum value is 1 MiB. + ChunkSize int64 + + // Concurrency defines the max number of concurrent uploads to be performed to upload the file. + // Each concurrent upload will create a buffer of size ChunkSize. The default value is one. + Concurrency int + + // LeaseAccessConditions contains optional parameters to access leased entity. + LeaseAccessConditions *LeaseAccessConditions +} + +func (u *UploadStreamOptions) setDefaults() { + if u.Concurrency == 0 { + u.Concurrency = 1 + } + + if u.ChunkSize < _1MiB { + u.ChunkSize = _1MiB + } +} + +func (u *UploadStreamOptions) getUploadRangeOptions() *UploadRangeOptions { + return &UploadRangeOptions{ + LeaseAccessConditions: u.LeaseAccessConditions, + } +} diff --git a/sdk/storage/azfile/file/responses.go b/sdk/storage/azfile/file/responses.go index 3101c3c9f4ff..208c38666b13 100644 --- a/sdk/storage/azfile/file/responses.go +++ b/sdk/storage/azfile/file/responses.go @@ -6,7 +6,9 @@ package file -import "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/generated" +import ( + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/generated" +) // CreateResponse contains the response from method Client.Create. type CreateResponse = generated.FileClientCreateResponse diff --git a/sdk/storage/azfile/go.mod b/sdk/storage/azfile/go.mod index fe84eb01a79c..943df24d78fc 100644 --- a/sdk/storage/azfile/go.mod +++ b/sdk/storage/azfile/go.mod @@ -3,17 +3,25 @@ module github.com/Azure/azure-sdk-for-go/sdk/storage/azfile go 1.18 require ( - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0 - github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2 + github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 github.com/stretchr/testify v1.7.0 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.9.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dnaeon/go-vcr v1.1.0 // indirect + github.com/golang-jwt/jwt/v4 v4.5.0 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/crypto v0.7.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/sdk/storage/azfile/go.sum b/sdk/storage/azfile/go.sum index 67ee617668d0..33b86c891fa5 100644 --- a/sdk/storage/azfile/go.sum +++ b/sdk/storage/azfile/go.sum @@ -1,25 +1,44 @@ -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0 h1:VuHAcMq8pU1IWNT/m5yRaGqbK0BiQKHT8X4DTp9CHdI= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0/go.mod h1:tZoQYdDZNOiIjdSn0dVWVfl0NEPGOJqVLzSrcFk4Is0= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1 h1:Oj853U9kG+RLTCQXpjvOnrv0WaZHxgmZz1TlLywgOPY= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0 h1:rTnT/Jrcm+figWlYz4Ixzt0SJVR2cMC8lvZcimipiEY= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0/go.mod h1:ON4tFdPTwRcgWEaVDrN3584Ef+b7GgSJaXxe5fW9t4M= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2 h1:uqM+VoHjVH6zdlkLF2b6O0ZANcHoj3rO0PoQ3jglUJA= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.2/go.mod h1:twTKAa1E6hLmSDjLhaCkbTMQKc7p/rNLU40rLxGEOCI= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 h1:sXr+ck84g/ZlZUOZiNELInmMgOsuGwdjjVkEIde0OtY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aovMlrjvvXoPMBVSPzk9185BT0+eZM= +github.com/AzureAD/microsoft-authentication-library-for-go v0.9.0 h1:UE9n9rkJF62ArLb1F3DEjRt8O3jLwMWdSoypKV4f3MU= +github.com/AzureAD/microsoft-authentication-library-for-go v0.9.0/go.mod h1:kgDmCTgBzIEPFElEF+FK0SdjAor06dRq2Go927dnQ6o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= +github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= +github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sdk/storage/azfile/internal/exported/transfer_validation_option.go b/sdk/storage/azfile/internal/exported/transfer_validation_option.go new file mode 100644 index 000000000000..ae8df1ea0def --- /dev/null +++ b/sdk/storage/azfile/internal/exported/transfer_validation_option.go @@ -0,0 +1,28 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package exported + +import ( + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/internal/generated" + "io" +) + +// TransferValidationType abstracts the various mechanisms used to verify a transfer. +type TransferValidationType interface { + Apply(io.ReadSeekCloser, generated.TransactionalContentSetter) (io.ReadSeekCloser, error) + notPubliclyImplementable() +} + +// TransferValidationTypeMD5 is a TransferValidationType used to provide a precomputed MD5. +type TransferValidationTypeMD5 []byte + +func (c TransferValidationTypeMD5) Apply(rsc io.ReadSeekCloser, cfg generated.TransactionalContentSetter) (io.ReadSeekCloser, error) { + cfg.SetMD5(c) + return rsc, nil +} + +func (TransferValidationTypeMD5) notPubliclyImplementable() {} diff --git a/sdk/storage/azfile/internal/generated/models.go b/sdk/storage/azfile/internal/generated/models.go new file mode 100644 index 000000000000..6450b7de2e82 --- /dev/null +++ b/sdk/storage/azfile/internal/generated/models.go @@ -0,0 +1,25 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package generated + +type TransactionalContentSetter interface { + SetMD5([]byte) + // add SetCRC64() when Azure File service starts supporting it. +} + +func (f *FileClientUploadRangeOptions) SetMD5(v []byte) { + f.ContentMD5 = v +} + +type SourceContentSetter interface { + SetSourceContentCRC64(v []byte) + // add SetSourceContentMD5() when Azure File service starts supporting it. +} + +func (f *FileClientUploadRangeFromURLOptions) SetSourceContentCRC64(v []byte) { + f.SourceContentCRC64 = v +} diff --git a/sdk/storage/azfile/internal/shared/batch_transfer.go b/sdk/storage/azfile/internal/shared/batch_transfer.go new file mode 100644 index 000000000000..ec5541bfbb13 --- /dev/null +++ b/sdk/storage/azfile/internal/shared/batch_transfer.go @@ -0,0 +1,77 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +package shared + +import ( + "context" + "errors" +) + +// BatchTransferOptions identifies options used by doBatchTransfer. +type BatchTransferOptions struct { + TransferSize int64 + ChunkSize int64 + Concurrency uint16 + Operation func(ctx context.Context, offset int64, chunkSize int64) error + OperationName string +} + +// DoBatchTransfer helps to execute operations in a batch manner. +// Can be used by users to customize batch works (for other scenarios that the SDK does not provide) +func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error { + if o.ChunkSize == 0 { + return errors.New("ChunkSize cannot be 0") + } + + if o.Concurrency == 0 { + o.Concurrency = 5 // default concurrency + } + + // Prepare and do parallel operations. + numChunks := uint16(((o.TransferSize - 1) / o.ChunkSize) + 1) + operationChannel := make(chan func() error, o.Concurrency) // Create the channel that release 'concurrency' goroutines concurrently + operationResponseChannel := make(chan error, numChunks) // Holds each response + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Create the goroutines that process each operation (in parallel). + for g := uint16(0); g < o.Concurrency; g++ { + //grIndex := g + go func() { + for f := range operationChannel { + err := f() + operationResponseChannel <- err + } + }() + } + + // Add each chunk's operation to the channel. + for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { + curChunkSize := o.ChunkSize + + if chunkNum == numChunks-1 { // Last chunk + curChunkSize = o.TransferSize - (int64(chunkNum) * o.ChunkSize) // Remove size of all transferred chunks from total + } + offset := int64(chunkNum) * o.ChunkSize + operationChannel <- func() error { + return o.Operation(ctx, offset, curChunkSize) + } + } + close(operationChannel) + + // Wait for the operations to complete. + var firstErr error = nil + for chunkNum := uint16(0); chunkNum < numChunks; chunkNum++ { + responseError := <-operationResponseChannel + // record the first error (the original error which should cause the other chunks to fail with canceled context) + if responseError != nil && firstErr == nil { + cancel() // As soon as any operation fails, cancel all remaining operation calls + firstErr = responseError + } + } + return firstErr +} diff --git a/sdk/storage/azfile/internal/shared/shared.go b/sdk/storage/azfile/internal/shared/shared.go index 830f5dc512da..9ef2a3396816 100644 --- a/sdk/storage/azfile/internal/shared/shared.go +++ b/sdk/storage/azfile/internal/shared/shared.go @@ -11,6 +11,8 @@ import ( "fmt" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" + "hash/crc64" + "io" "net" "strings" ) @@ -36,6 +38,10 @@ const ( const StorageAnalyticsVersion = "1.0" +const crc64Polynomial uint64 = 0x9A6C9329AC4BC9B5 + +var CRC64Table = crc64.MakeTable(crc64Polynomial) + const ( // DefaultFilePermissionString is a constant for all intents and purposes. // Inherit inherits permissions from the parent folder (default when creating files/folders) @@ -164,3 +170,40 @@ func GenerateLeaseID(leaseID *string) (*string, error) { } return leaseID, nil } + +func ValidateSeekableStreamAt0AndGetCount(body io.ReadSeeker) (int64, error) { + if body == nil { // nil body is "logically" seekable to 0 and are 0 bytes long + return 0, nil + } + + err := validateSeekableStreamAt0(body) + if err != nil { + return 0, err + } + + count, err := body.Seek(0, io.SeekEnd) + if err != nil { + return 0, errors.New("body stream must be seekable") + } + + _, err = body.Seek(0, io.SeekStart) + if err != nil { + return 0, err + } + return count, nil +} + +// return an error if body is not a valid seekable stream at 0 +func validateSeekableStreamAt0(body io.ReadSeeker) error { + if body == nil { // nil body is "logically" seekable to 0 + return nil + } + if pos, err := body.Seek(0, io.SeekCurrent); pos != 0 || err != nil { + // Help detect programmer error + if err != nil { + return errors.New("body stream must be seekable") + } + return errors.New("body stream must be set to position 0") + } + return nil +}