Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/storage/azdatalake/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Release History

## 1.1.2 (Unreleased)

### Features Added
* Append API with acquire lease, release lease and renewal of lease support.
* Flush API bundled with release lease option.

### Breaking Changes

## 1.1.1 (2024-02-29)

### Bugs Fixed
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azdatalake/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "go",
"TagPrefix": "go/storage/azdatalake",
"Tag": "go/storage/azdatalake_9248522dce"
"Tag": "go/storage/azdatalake_9d160e2359"
}
7 changes: 0 additions & 7 deletions sdk/storage/azdatalake/file/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,6 @@ func (f *Client) AppendData(ctx context.Context, offset int64, body io.ReadSeekC
return AppendDataResponse{}, err
}
resp, err := f.generatedFileClientWithDFS().AppendData(ctx, body, appendDataOptions, nil, leaseAccessConditions, cpkInfo)
// TODO: check and uncomment this
//if err != nil {
// _, err1 := body.Seek(0, io.SeekStart)
// if err1 != nil {
// return AppendDataResponse{}, err1
// }
//}
return resp, exported.ConvertToDFSError(err)
}

Expand Down
163 changes: 161 additions & 2 deletions sdk/storage/azdatalake/file/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"encoding/binary"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/path"
"hash/crc64"
"io"
"math/rand"
Expand Down Expand Up @@ -3461,7 +3463,7 @@ func (s *RecordedTestSuite) TestDownloadDataContentMD5() {
_require.Equal(resp1.ContentMD5, mdf[:])
}

func (s *RecordedTestSuite) TestFileAppendWithFlushOption() {
func (s *RecordedTestSuite) TestFileAppendDataWithAcquireLease() {
_require := require.New(s.T())
testName := s.T().Name()

Expand All @@ -3484,15 +3486,172 @@ func (s *RecordedTestSuite) TestFileAppendWithFlushOption() {

contentSize := 1024 * 8 // 8KB
rsc, _ := testcommon.GenerateData(contentSize)

opts := &file.AppendDataOptions{
LeaseAction: &file.LeaseActionAcquire,
LeaseDuration: to.Ptr(int64(15)),
ProposedLeaseID: proposedLeaseIDs[1],
}
_, err = srcFClient.AppendData(context.Background(), 0, rsc, opts)
_require.NoError(err)

gResp2, err := srcFClient.GetProperties(context.Background(), nil)
_require.NoError(err)
_require.Equal(lease.StateTypeLeased, *gResp2.LeaseState)

time.Sleep(time.Second * 15)

//Check if the lease was acquired for the right duration
gResp, err := srcFClient.GetProperties(context.Background(), nil)
_require.NoError(err)
_require.Equal(lease.StateTypeExpired, *gResp.LeaseState)
}

func (s *RecordedTestSuite) TestFileAppendDataWithRenewLease() {
_require := require.New(s.T())
testName := s.T().Name()

filesystemName := testcommon.GenerateFileSystemName(testName)
fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)
defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient)

_, err = fsClient.Create(context.Background(), nil)
_require.NoError(err)

srcFileName := "src" + testcommon.GenerateFileName(testName)

srcFClient, err := testcommon.GetFileClient(filesystemName, srcFileName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)

createOpts := file.CreateOptions{
ProposedLeaseID: proposedLeaseIDs[0],
LeaseDuration: to.Ptr(int64(15)),
}

resp, err := srcFClient.Create(context.Background(), &createOpts)
_require.NoError(err)
_require.NotNil(resp)

gResp2, err := srcFClient.GetProperties(context.Background(), nil)
_require.NoError(err)
_require.Equal(lease.StateTypeLeased, *gResp2.LeaseState)

//Wait for 15 seconds for lease to expire
time.Sleep(15 * time.Second)

gResp, err := srcFClient.GetProperties(context.Background(), nil)
_require.NoError(err)
_require.Equal(lease.StateTypeExpired, *gResp.LeaseState)

contentSize := 1024 * 8 // 8KB
rsc, _ := testcommon.GenerateData(contentSize)

opts := &file.AppendDataOptions{
LeaseAction: &file.LeaseActionRenew,
LeaseAccessConditions: &file.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]},
LeaseDuration: to.Ptr(int64(-1)),
Comment thread
souravgupta-msft marked this conversation as resolved.
}
_, err = srcFClient.AppendData(context.Background(), 0, rsc, opts)
_require.NoError(err)

gResp2, err = srcFClient.GetProperties(context.Background(), nil)
_require.NoError(err)
_require.Equal(lease.StateTypeLeased, *gResp2.LeaseState)
}

func (s *RecordedTestSuite) TestFileAppendDataWithReleaseLease() {
_require := require.New(s.T())
testName := s.T().Name()

filesystemName := testcommon.GenerateFileSystemName(testName)
fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)
defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient)

_, err = fsClient.Create(context.Background(), nil)
_require.NoError(err)

srcFileName := "src" + testcommon.GenerateFileName(testName)

srcFClient, err := testcommon.GetFileClient(filesystemName, srcFileName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)

createOpts := file.CreateOptions{
ProposedLeaseID: proposedLeaseIDs[0],
LeaseDuration: to.Ptr(int64(15)),
}

resp, err := srcFClient.Create(context.Background(), &createOpts)
_require.NoError(err)
_require.NotNil(resp)

contentSize := 1024 * 8 // 8KB
rsc, _ := testcommon.GenerateData(contentSize)

opts := &file.AppendDataOptions{
Flush: to.Ptr(true),
LeaseAction: &file.LeaseActionRelease,
LeaseAccessConditions: &file.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]},
Flush: to.Ptr(true),
}

_, err = srcFClient.AppendData(context.Background(), 0, rsc, opts)
_require.NoError(err)

gResp, err := srcFClient.GetProperties(context.Background(), nil)
_require.NoError(err)
_require.Equal(lease.StateTypeAvailable, *gResp.LeaseState)
}

func (s *RecordedTestSuite) TestFileAppendWithFlushReleaseLease() {
_require := require.New(s.T())
testName := s.T().Name()

filesystemName := testcommon.GenerateFileSystemName(testName)
fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)
defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient)

_, err = fsClient.Create(context.Background(), nil)
_require.NoError(err)

srcFileName := "src" + testcommon.GenerateFileName(testName)

srcFClient, err := testcommon.GetFileClient(filesystemName, srcFileName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)

createOpts := file.CreateOptions{
ProposedLeaseID: proposedLeaseIDs[0],
LeaseDuration: to.Ptr(int64(15)),
}

resp, err := srcFClient.Create(context.Background(), &createOpts)
_require.NoError(err)
_require.NotNil(resp)

contentSize := 1024 * 8 // 8KB
rsc, _ := testcommon.GenerateData(contentSize)

_, err = srcFClient.AppendData(context.Background(), 0, rsc,
&file.AppendDataOptions{
LeaseAccessConditions: &file.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]},
})
_require.NoError(err)

opts := &file.FlushDataOptions{
LeaseAction: &file.LeaseActionRelease,
AccessConditions: &path.AccessConditions{
LeaseAccessConditions: &path.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]},
},
}

_, err = srcFClient.FlushData(context.Background(), int64(contentSize), opts)
_require.NoError(err)

gResp2, err := srcFClient.GetProperties(context.Background(), nil)
_require.NoError(err)
_require.Equal(*gResp2.ContentLength, int64(contentSize))
_require.Equal(lease.StateTypeAvailable, *gResp2.LeaseState)
}

func (s *RecordedTestSuite) TestFileAppendAndFlushData() {
Expand Down
10 changes: 10 additions & 0 deletions sdk/storage/azdatalake/file/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,13 @@ const (
StateTypeBreaking StateType = azdatalake.StateTypeBreaking
StateTypeBroken StateType = azdatalake.StateTypeBroken
)

// LeaseAction Describes actions that can be performed on a lease.
type LeaseAction = path.LeaseAction

var (
LeaseActionAcquire LeaseAction = path.LeaseActionAcquire
LeaseActionRelease LeaseAction = path.LeaseActionRelease
LeaseActionAcquireRelease LeaseAction = path.LeaseActionAcquireRelease
LeaseActionRenew LeaseAction = path.LeaseActionRenew
)
41 changes: 41 additions & 0 deletions sdk/storage/azdatalake/file/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/path"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/internal/shared"
"hash/crc64"
"io"
Expand Down Expand Up @@ -387,3 +389,42 @@ func Example_file_AppendAndFlushDataWithValidation() {
handleError(err)
fmt.Println(*gResp2.ContentLength, int64(contentSize))
}

func Example_file_AppendAndFlushDataWithAcquireAndReleaseLease() {
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT_NAME"), os.Getenv("AZURE_STORAGE_ACCOUNT_KEY")

// Create a file client
u := fmt.Sprintf("https://%s.dfs.core.windows.net/fs/file.txt", accountName)
credential, err := azdatalake.NewSharedKeyCredential(accountName, accountKey)
handleError(err)
fClient, err := file.NewClientWithSharedKeyCredential(u, credential, nil)
handleError(err)

contentSize := 1024 * 8 // 8KB
content := make([]byte, contentSize)
body := bytes.NewReader(content)
rsc := streaming.NopCloser(body)

// Acquire lease during append data
opts := &file.AppendDataOptions{
LeaseAction: &file.LeaseActionAcquire,
LeaseDuration: to.Ptr(int64(15)),
ProposedLeaseID: proposedLeaseIDs[1],
}
_, err = fClient.AppendData(context.Background(), 0, rsc, opts)
handleError(err)

_, err = fClient.FlushData(context.Background(), int64(contentSize), &file.FlushDataOptions{
LeaseAction: &file.LeaseActionRelease,
AccessConditions: &path.AccessConditions{
LeaseAccessConditions: &path.LeaseAccessConditions{LeaseID: proposedLeaseIDs[0]},
},
})
handleError(err)

gResp2, err := fClient.GetProperties(context.Background(), nil)
handleError(err)
// Check if the lease is released
fmt.Println(lease.StateTypeAvailable, *gResp2.LeaseState)

}
36 changes: 30 additions & 6 deletions sdk/storage/azdatalake/file/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ type FlushDataOptions struct {
// RetainUncommittedData if "true", uncommitted data is retained after the flush operation
// completes, otherwise, the uncommitted data is deleted after the flush operation.
RetainUncommittedData *bool
// LeaseAction Describes actions that can be performed on a lease.
LeaseAction *LeaseAction
// LeaseDuration specifies the duration of the lease, in seconds, or negative one
// (-1) for a lease that never expires. A non-infinite lease can be between 15 and 60 seconds.
LeaseDuration *int64
// ProposedLeaseID specifies the proposed lease ID for the file.
ProposedLeaseID *string
}

func (o *FlushDataOptions) format(offset int64) (*generated.PathClientFlushDataOptions, *generated.ModifiedAccessConditions, *generated.LeaseAccessConditions, *generated.PathHTTPHeaders, *generated.CPKInfo, error) {
Expand Down Expand Up @@ -230,6 +237,9 @@ func (o *FlushDataOptions) format(offset int64) (*generated.PathClientFlushDataO
cpkInfoOpts.EncryptionKeySHA256 = o.CPKInfo.EncryptionKeySHA256
cpkInfoOpts.EncryptionAlgorithm = o.CPKInfo.EncryptionAlgorithm
}
flushDataOpts.LeaseAction = o.LeaseAction
flushDataOpts.LeaseDuration = o.LeaseDuration
flushDataOpts.ProposedLeaseID = o.ProposedLeaseID
}
return flushDataOpts, modifiedAccessConditions, leaseAccessConditions, httpHeaderOpts, cpkInfoOpts, nil
}
Expand All @@ -241,13 +251,22 @@ type AppendDataOptions struct {
TransactionalValidation TransferValidationType
// LeaseAccessConditions contains optional parameters to access leased entity.
LeaseAccessConditions *LeaseAccessConditions
// LeaseAction describes actions that can be performed on a lease.
LeaseAction *LeaseAction
// LeaseDuration specifies the duration of the lease, in seconds, or negative one
// (-1) for a lease that never expires. A non-infinite lease can be between 15 and 60 seconds.
LeaseDuration *int64
// ProposedLeaseID specifies the proposed lease ID for the file.
ProposedLeaseID *string
// CPKInfo contains optional parameters to perform encryption using customer-provided key.
CPKInfo *CPKInfo
//Flush Optional. If true, the file will be flushed after append.
Flush *bool
}

func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*generated.PathClientAppendDataOptions, *generated.LeaseAccessConditions, *generated.CPKInfo, error) {
func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*generated.PathClientAppendDataOptions,
*generated.LeaseAccessConditions, *generated.CPKInfo, error) {

if offset < 0 || body == nil {
return nil, nil, nil, errors.New("invalid argument: offset must be >= 0 and body must not be nil")
}
Expand Down Expand Up @@ -282,12 +301,17 @@ func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*gener
cpkInfoOpts.EncryptionKeySHA256 = o.CPKInfo.EncryptionKeySHA256
cpkInfoOpts.EncryptionAlgorithm = o.CPKInfo.EncryptionAlgorithm
}

appendDataOptions.LeaseAction = o.LeaseAction
appendDataOptions.LeaseDuration = o.LeaseDuration
appendDataOptions.ProposedLeaseID = o.ProposedLeaseID
appendDataOptions.Flush = o.Flush
}
if o != nil && o.TransactionalValidation != nil {
_, err = o.TransactionalValidation.Apply(body, appendDataOptions)
if err != nil {
return nil, nil, nil, err

if o.TransactionalValidation != nil {
_, err = o.TransactionalValidation.Apply(body, appendDataOptions)
if err != nil {
return nil, nil, nil, err
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions sdk/storage/azdatalake/internal/path/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,12 @@ const (
StateTypeBreaking StateType = azdatalake.StateTypeBreaking
StateTypeBroken StateType = azdatalake.StateTypeBroken
)

type LeaseAction = generated.LeaseAction

const (
LeaseActionAcquire = generated.LeaseActionAcquire
LeaseActionRelease = generated.LeaseActionRelease
LeaseActionAcquireRelease = generated.LeaseActionAcquireRelease
LeaseActionRenew = generated.LeaseActionAutoRenew
)