diff --git a/sdk/storage/azdatalake/CHANGELOG.md b/sdk/storage/azdatalake/CHANGELOG.md index bd88e5627462..81ea8529ef3a 100644 --- a/sdk/storage/azdatalake/CHANGELOG.md +++ b/sdk/storage/azdatalake/CHANGELOG.md @@ -1,5 +1,13 @@ # Release History +## 1.1.2 (Unreleased) + +### Features Added +* Append API with acquire lease, release lease and renewal of lease support. +* Flush API bundled with release lease option. + +### Breaking Changes + ## 1.1.1 (2024-02-29) ### Bugs Fixed diff --git a/sdk/storage/azdatalake/assets.json b/sdk/storage/azdatalake/assets.json index 71e63fe6e1e3..a15e5dbe8505 100644 --- a/sdk/storage/azdatalake/assets.json +++ b/sdk/storage/azdatalake/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "go", "TagPrefix": "go/storage/azdatalake", - "Tag": "go/storage/azdatalake_9248522dce" + "Tag": "go/storage/azdatalake_9d160e2359" } diff --git a/sdk/storage/azdatalake/file/client.go b/sdk/storage/azdatalake/file/client.go index 75dbd8fccbfb..2724ce67c7b2 100644 --- a/sdk/storage/azdatalake/file/client.go +++ b/sdk/storage/azdatalake/file/client.go @@ -411,13 +411,6 @@ func (f *Client) AppendData(ctx context.Context, offset int64, body io.ReadSeekC return AppendDataResponse{}, err } resp, err := f.generatedFileClientWithDFS().AppendData(ctx, body, appendDataOptions, nil, leaseAccessConditions, cpkInfo) - // TODO: check and uncomment this - //if err != nil { - // _, err1 := body.Seek(0, io.SeekStart) - // if err1 != nil { - // return AppendDataResponse{}, err1 - // } - //} return resp, exported.ConvertToDFSError(err) } diff --git a/sdk/storage/azdatalake/file/client_test.go b/sdk/storage/azdatalake/file/client_test.go index 5d9e93ab2d28..00de8adbf399 100644 --- a/sdk/storage/azdatalake/file/client_test.go +++ b/sdk/storage/azdatalake/file/client_test.go @@ -13,6 +13,8 @@ import ( "encoding/binary" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/path" "hash/crc64" "io" "math/rand" @@ -3461,7 +3463,7 @@ func (s *RecordedTestSuite) TestDownloadDataContentMD5() { _require.Equal(resp1.ContentMD5, mdf[:]) } -func (s *RecordedTestSuite) TestFileAppendWithFlushOption() { +func (s *RecordedTestSuite) TestFileAppendDataWithAcquireLease() { _require := require.New(s.T()) testName := s.T().Name() @@ -3484,15 +3486,172 @@ func (s *RecordedTestSuite) TestFileAppendWithFlushOption() { contentSize := 1024 * 8 // 8KB rsc, _ := testcommon.GenerateData(contentSize) + + opts := &file.AppendDataOptions{ + LeaseAction: &file.LeaseActionAcquire, + LeaseDuration: to.Ptr(int64(15)), + ProposedLeaseID: proposedLeaseIDs[1], + } + _, err = srcFClient.AppendData(context.Background(), 0, rsc, opts) + _require.NoError(err) + + gResp2, err := srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(lease.StateTypeLeased, *gResp2.LeaseState) + + time.Sleep(time.Second * 15) + + //Check if the lease was acquired for the right duration + gResp, err := srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(lease.StateTypeExpired, *gResp.LeaseState) +} + +func (s *RecordedTestSuite) TestFileAppendDataWithRenewLease() { + _require := require.New(s.T()) + testName := s.T().Name() + + filesystemName := testcommon.GenerateFileSystemName(testName) + fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil) + _require.NoError(err) + defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient) + + _, err = fsClient.Create(context.Background(), nil) + _require.NoError(err) + + srcFileName := "src" + testcommon.GenerateFileName(testName) + + srcFClient, err := testcommon.GetFileClient(filesystemName, srcFileName, s.T(), testcommon.TestAccountDatalake, nil) + _require.NoError(err) + + createOpts := file.CreateOptions{ + ProposedLeaseID: proposedLeaseIDs[0], + LeaseDuration: to.Ptr(int64(15)), + } + + resp, err := srcFClient.Create(context.Background(), &createOpts) + _require.NoError(err) + _require.NotNil(resp) + + gResp2, err := srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(lease.StateTypeLeased, *gResp2.LeaseState) + + //Wait for 15 seconds for lease to expire + time.Sleep(15 * time.Second) + + gResp, err := srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(lease.StateTypeExpired, *gResp.LeaseState) + + contentSize := 1024 * 8 // 8KB + rsc, _ := testcommon.GenerateData(contentSize) + + opts := &file.AppendDataOptions{ + LeaseAction: &file.LeaseActionRenew, + LeaseAccessConditions: &file.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]}, + LeaseDuration: to.Ptr(int64(-1)), + } + _, err = srcFClient.AppendData(context.Background(), 0, rsc, opts) + _require.NoError(err) + + gResp2, err = srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(lease.StateTypeLeased, *gResp2.LeaseState) +} + +func (s *RecordedTestSuite) TestFileAppendDataWithReleaseLease() { + _require := require.New(s.T()) + testName := s.T().Name() + + filesystemName := testcommon.GenerateFileSystemName(testName) + fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil) + _require.NoError(err) + defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient) + + _, err = fsClient.Create(context.Background(), nil) + _require.NoError(err) + + srcFileName := "src" + testcommon.GenerateFileName(testName) + + srcFClient, err := testcommon.GetFileClient(filesystemName, srcFileName, s.T(), testcommon.TestAccountDatalake, nil) + _require.NoError(err) + + createOpts := file.CreateOptions{ + ProposedLeaseID: proposedLeaseIDs[0], + LeaseDuration: to.Ptr(int64(15)), + } + + resp, err := srcFClient.Create(context.Background(), &createOpts) + _require.NoError(err) + _require.NotNil(resp) + + contentSize := 1024 * 8 // 8KB + rsc, _ := testcommon.GenerateData(contentSize) + opts := &file.AppendDataOptions{ - Flush: to.Ptr(true), + LeaseAction: &file.LeaseActionRelease, + LeaseAccessConditions: &file.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]}, + Flush: to.Ptr(true), } + _, err = srcFClient.AppendData(context.Background(), 0, rsc, opts) _require.NoError(err) + gResp, err := srcFClient.GetProperties(context.Background(), nil) + _require.NoError(err) + _require.Equal(lease.StateTypeAvailable, *gResp.LeaseState) +} + +func (s *RecordedTestSuite) TestFileAppendWithFlushReleaseLease() { + _require := require.New(s.T()) + testName := s.T().Name() + + filesystemName := testcommon.GenerateFileSystemName(testName) + fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil) + _require.NoError(err) + defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient) + + _, err = fsClient.Create(context.Background(), nil) + _require.NoError(err) + + srcFileName := "src" + testcommon.GenerateFileName(testName) + + srcFClient, err := testcommon.GetFileClient(filesystemName, srcFileName, s.T(), testcommon.TestAccountDatalake, nil) + _require.NoError(err) + + createOpts := file.CreateOptions{ + ProposedLeaseID: proposedLeaseIDs[0], + LeaseDuration: to.Ptr(int64(15)), + } + + resp, err := srcFClient.Create(context.Background(), &createOpts) + _require.NoError(err) + _require.NotNil(resp) + + contentSize := 1024 * 8 // 8KB + rsc, _ := testcommon.GenerateData(contentSize) + + _, err = srcFClient.AppendData(context.Background(), 0, rsc, + &file.AppendDataOptions{ + LeaseAccessConditions: &file.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]}, + }) + _require.NoError(err) + + opts := &file.FlushDataOptions{ + LeaseAction: &file.LeaseActionRelease, + AccessConditions: &path.AccessConditions{ + LeaseAccessConditions: &path.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]}, + }, + } + + _, err = srcFClient.FlushData(context.Background(), int64(contentSize), opts) + _require.NoError(err) + gResp2, err := srcFClient.GetProperties(context.Background(), nil) _require.NoError(err) _require.Equal(*gResp2.ContentLength, int64(contentSize)) + _require.Equal(lease.StateTypeAvailable, *gResp2.LeaseState) } func (s *RecordedTestSuite) TestFileAppendAndFlushData() { diff --git a/sdk/storage/azdatalake/file/constants.go b/sdk/storage/azdatalake/file/constants.go index 77729179d3d4..357472077845 100644 --- a/sdk/storage/azdatalake/file/constants.go +++ b/sdk/storage/azdatalake/file/constants.go @@ -124,3 +124,13 @@ const ( StateTypeBreaking StateType = azdatalake.StateTypeBreaking StateTypeBroken StateType = azdatalake.StateTypeBroken ) + +// LeaseAction Describes actions that can be performed on a lease. +type LeaseAction = path.LeaseAction + +var ( + LeaseActionAcquire LeaseAction = path.LeaseActionAcquire + LeaseActionRelease LeaseAction = path.LeaseActionRelease + LeaseActionAcquireRelease LeaseAction = path.LeaseActionAcquireRelease + LeaseActionRenew LeaseAction = path.LeaseActionRenew +) diff --git a/sdk/storage/azdatalake/file/examples_test.go b/sdk/storage/azdatalake/file/examples_test.go index 213adfb3f173..0bb43c557216 100644 --- a/sdk/storage/azdatalake/file/examples_test.go +++ b/sdk/storage/azdatalake/file/examples_test.go @@ -14,9 +14,11 @@ import ( "fmt" "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/path" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/shared" "hash/crc64" "io" @@ -387,3 +389,42 @@ func Example_file_AppendAndFlushDataWithValidation() { handleError(err) fmt.Println(*gResp2.ContentLength, int64(contentSize)) } + +func Example_file_AppendAndFlushDataWithAcquireAndReleaseLease() { + accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT_NAME"), os.Getenv("AZURE_STORAGE_ACCOUNT_KEY") + + // Create a file client + u := fmt.Sprintf("https://%s.dfs.core.windows.net/fs/file.txt", accountName) + credential, err := azdatalake.NewSharedKeyCredential(accountName, accountKey) + handleError(err) + fClient, err := file.NewClientWithSharedKeyCredential(u, credential, nil) + handleError(err) + + contentSize := 1024 * 8 // 8KB + content := make([]byte, contentSize) + body := bytes.NewReader(content) + rsc := streaming.NopCloser(body) + + // Acquire lease during append data + opts := &file.AppendDataOptions{ + LeaseAction: &file.LeaseActionAcquire, + LeaseDuration: to.Ptr(int64(15)), + ProposedLeaseID: proposedLeaseIDs[1], + } + _, err = fClient.AppendData(context.Background(), 0, rsc, opts) + handleError(err) + + _, err = fClient.FlushData(context.Background(), int64(contentSize), &file.FlushDataOptions{ + LeaseAction: &file.LeaseActionRelease, + AccessConditions: &path.AccessConditions{ + LeaseAccessConditions: &path.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]}, + }, + }) + handleError(err) + + gResp2, err := fClient.GetProperties(context.Background(), nil) + handleError(err) + // Check if the lease is released + fmt.Println(lease.StateTypeAvailable, *gResp2.LeaseState) + +} diff --git a/sdk/storage/azdatalake/file/models.go b/sdk/storage/azdatalake/file/models.go index 7ffba3b11a31..1f9eac9d12d9 100644 --- a/sdk/storage/azdatalake/file/models.go +++ b/sdk/storage/azdatalake/file/models.go @@ -185,6 +185,13 @@ type FlushDataOptions struct { // RetainUncommittedData if "true", uncommitted data is retained after the flush operation // completes, otherwise, the uncommitted data is deleted after the flush operation. RetainUncommittedData *bool + // LeaseAction Describes actions that can be performed on a lease. + LeaseAction *LeaseAction + // LeaseDuration specifies the duration of the lease, in seconds, or negative one + // (-1) for a lease that never expires. A non-infinite lease can be between 15 and 60 seconds. + LeaseDuration *int64 + // ProposedLeaseID specifies the proposed lease ID for the file. + ProposedLeaseID *string } func (o *FlushDataOptions) format(offset int64) (*generated.PathClientFlushDataOptions, *generated.ModifiedAccessConditions, *generated.LeaseAccessConditions, *generated.PathHTTPHeaders, *generated.CPKInfo, error) { @@ -230,6 +237,9 @@ func (o *FlushDataOptions) format(offset int64) (*generated.PathClientFlushDataO cpkInfoOpts.EncryptionKeySHA256 = o.CPKInfo.EncryptionKeySHA256 cpkInfoOpts.EncryptionAlgorithm = o.CPKInfo.EncryptionAlgorithm } + flushDataOpts.LeaseAction = o.LeaseAction + flushDataOpts.LeaseDuration = o.LeaseDuration + flushDataOpts.ProposedLeaseID = o.ProposedLeaseID } return flushDataOpts, modifiedAccessConditions, leaseAccessConditions, httpHeaderOpts, cpkInfoOpts, nil } @@ -241,13 +251,22 @@ type AppendDataOptions struct { TransactionalValidation TransferValidationType // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions + // LeaseAction describes actions that can be performed on a lease. + LeaseAction *LeaseAction + // LeaseDuration specifies the duration of the lease, in seconds, or negative one + // (-1) for a lease that never expires. A non-infinite lease can be between 15 and 60 seconds. + LeaseDuration *int64 + // ProposedLeaseID specifies the proposed lease ID for the file. + ProposedLeaseID *string // CPKInfo contains optional parameters to perform encryption using customer-provided key. CPKInfo *CPKInfo //Flush Optional. If true, the file will be flushed after append. Flush *bool } -func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*generated.PathClientAppendDataOptions, *generated.LeaseAccessConditions, *generated.CPKInfo, error) { +func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*generated.PathClientAppendDataOptions, + *generated.LeaseAccessConditions, *generated.CPKInfo, error) { + if offset < 0 || body == nil { return nil, nil, nil, errors.New("invalid argument: offset must be >= 0 and body must not be nil") } @@ -282,12 +301,17 @@ func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*gener cpkInfoOpts.EncryptionKeySHA256 = o.CPKInfo.EncryptionKeySHA256 cpkInfoOpts.EncryptionAlgorithm = o.CPKInfo.EncryptionAlgorithm } + + appendDataOptions.LeaseAction = o.LeaseAction + appendDataOptions.LeaseDuration = o.LeaseDuration + appendDataOptions.ProposedLeaseID = o.ProposedLeaseID appendDataOptions.Flush = o.Flush - } - if o != nil && o.TransactionalValidation != nil { - _, err = o.TransactionalValidation.Apply(body, appendDataOptions) - if err != nil { - return nil, nil, nil, err + + if o.TransactionalValidation != nil { + _, err = o.TransactionalValidation.Apply(body, appendDataOptions) + if err != nil { + return nil, nil, nil, err + } } } diff --git a/sdk/storage/azdatalake/internal/path/constants.go b/sdk/storage/azdatalake/internal/path/constants.go index 0744d15e9205..6062851c4e78 100644 --- a/sdk/storage/azdatalake/internal/path/constants.go +++ b/sdk/storage/azdatalake/internal/path/constants.go @@ -73,3 +73,12 @@ const ( StateTypeBreaking StateType = azdatalake.StateTypeBreaking StateTypeBroken StateType = azdatalake.StateTypeBroken ) + +type LeaseAction = generated.LeaseAction + +const ( + LeaseActionAcquire = generated.LeaseActionAcquire + LeaseActionRelease = generated.LeaseActionRelease + LeaseActionAcquireRelease = generated.LeaseActionAcquireRelease + LeaseActionRenew = generated.LeaseActionAutoRenew +)