Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sdk/storage/azdatalake/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@
## 1.1.2 (Unreleased)

### Features Added
* Append API Bundled with Flush functionality
* HNS Encryption Scope support
* Append API with acquire lease, release lease and renewal of lease support.
* Flush API bundled with release lease option.
* HNS Encryption Context support
* Pagination Support for recursive directory deletion
* Bundle ability to set permission, owner, group, acl, lease, expiry time and umask along with FileSystem.CreateFile and FileSystem.CreateDirectory APIs.
* Added support for AAD Audience when OAuth is used.
* Updated service version to `2023-11-03`
* Integrate `InsecureAllowCredentialWithHTTP` client options.

### Breaking Changes

### Bugs Fixed
* Fixed an issue where GetSASURL() was providing HTTPS SAS, instead of the default http+https SAS. Fixes [#22448](https://github.com/Azure/azure-sdk-for-go/issues/22448)

### Other Changes
* Updated azcore version to `1.11.1`

## 1.1.1 (2024-02-29)

Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azdatalake/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "go",
"TagPrefix": "go/storage/azdatalake",
"Tag": "go/storage/azdatalake_6e4e5b7c87"
"Tag": "go/storage/azdatalake_8cf0ce4c24"
}
14 changes: 10 additions & 4 deletions sdk/storage/azdatalake/directory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ type Client base.CompositeClient[generated.PathClient, generated_blob.BlobClient
func NewClient(directoryURL string, cred azcore.TokenCredential, options *ClientOptions) (*Client, error) {
blobURL, directoryURL := shared.GetURLs(directoryURL)

authPolicy := runtime.NewBearerTokenPolicy(cred, []string{shared.TokenScope}, nil)
audience := base.GetAudience((*base.ClientOptions)(options))
conOptions := shared.GetClientOptions(options)
authPolicy := shared.NewStorageChallengePolicy(cred, audience, conOptions.InsecureAllowCredentialWithHTTP)
plOpts := runtime.PipelineOptions{
PerRetry: []policy.Policy{authPolicy},
}
Expand Down Expand Up @@ -262,9 +263,14 @@ func (d *Client) Create(ctx context.Context, options *CreateOptions) (CreateResp
// Delete deletes directory and any path under it.
func (d *Client) Delete(ctx context.Context, options *DeleteOptions) (DeleteResponse, error) {
lac, mac, deleteOpts := path.FormatDeleteOptions(options, true)
resp, err := d.generatedDirClientWithDFS().Delete(ctx, deleteOpts, lac, mac)
err = exported.ConvertToDFSError(err)
return resp, err
for {
resp, err := d.generatedDirClientWithDFS().Delete(ctx, deleteOpts, lac, mac)
if resp.Continuation == nil || err != nil {
err = exported.ConvertToDFSError(err)
return resp, err
}
deleteOpts.Continuation = resp.Continuation
}
}

// GetProperties gets the properties of a directory.
Expand Down
142 changes: 142 additions & 0 deletions sdk/storage/azdatalake/directory/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ package directory_test

import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
"net/http"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -767,6 +769,76 @@ func (s *RecordedTestSuite) TestDeleteDirWithNilAccessConditions() {
_require.NotNil(resp)
}

// To run this test, the NamespaceTenant AAD info needs to be set to an AAD app that does not have any RBAC permissions,
// and entityId needs to be set to the entity ID of the application.
func (s *RecordedTestSuite) TestDeleteDirWithPaginatedDelete() {

s.T().Skip("AAD app not configured for this test, this will be skipped")
_require := require.New(s.T())
testName := s.T().Name()
user := "user"
readWriteExecutePermission := "rwx"

objectId := "" // object ID of an AAD app which has no RBAC permissions
accountName, accountKey := testcommon.GetGenericAccountInfo(testcommon.TestAccountDatalake)

filesystemName := testcommon.GenerateFileSystemName(testName)
fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)
defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient)

_, err = fsClient.Create(context.Background(), nil)
_require.NoError(err)

rootDirectory := fsClient.NewDirectoryClient("/")

dirName := testcommon.GenerateDirName(testName)
dirURL := "https://" + accountName + ".dfs.core.windows.net/" + filesystemName + "/" + dirName
credential, err := azdatalake.NewSharedKeyCredential(accountName, accountKey)
_require.NoError(err)

dirClient, err := directory.NewClientWithSharedKeyCredential(dirURL, credential, nil)
_require.NoError(err)

resp, err := dirClient.Create(context.Background(), nil)
_require.NoError(err)
_require.NotNil(resp)

for i := 0; i < 5020; i++ {
fileClient, err := dirClient.NewFileClient(testcommon.GenerateFileName(testName) + strconv.Itoa(i))
_require.NoError(err)
_require.NotNil(fileClient)

_, err = fileClient.Create(context.Background(), nil)
_require.NoError(err)
}

accessControlResp, err := rootDirectory.GetAccessControl(context.Background(), nil)
_require.NoError(err)

newAcl := *accessControlResp.ACL + "," + user + ":" + objectId + ":" + readWriteExecutePermission

_, err = rootDirectory.SetAccessControlRecursive(context.Background(), newAcl, nil)
_require.NoError(err)

cred, err := testcommon.GetGenericTokenCredential()
_require.NoError(err)

directoryURL := "https://" + accountName + ".dfs.core.windows.net/" + filesystemName + "/" + dirName

newDirClient, err := directory.NewClient(directoryURL, cred, nil)
_require.NoError(err)

deleteOpts := &directory.DeleteOptions{
Paginated: to.Ptr(true),
}

response, err := newDirClient.Delete(context.Background(), deleteOpts)
_require.NoError(err)
_require.Nil(response.Continuation)
_require.NotNil(response)
}

func (s *RecordedTestSuite) TestDeleteDirIfModifiedSinceTrue() {
_require := require.New(s.T())
testName := s.T().Name()
Expand Down Expand Up @@ -2850,3 +2922,73 @@ func (s *UnrecordedTestSuite) TestDirCreateDeleteUsingOAuth() {
_, err = dirClient.GetProperties(context.Background(), nil)
_require.NoError(err)
}

func (s *RecordedTestSuite) TestCreateDirectoryClientDefaultAudience() {
_require := require.New(s.T())
testName := s.T().Name()

filesystemName := testcommon.GenerateFileSystemName(testName)
fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)
defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient)

_, err = fsClient.Create(context.Background(), nil)
_require.NoError(err)

cred, err := testcommon.GetGenericTokenCredential()
_require.NoError(err)

accountName, _ := testcommon.GetGenericAccountInfo(testcommon.TestAccountDatalake)
_require.Greater(len(accountName), 0)

dirName := testcommon.GenerateDirName(testName)
dirURL := "https://" + accountName + ".dfs.core.windows.net/" + filesystemName + "/" + dirName

options := &directory.ClientOptions{Audience: "https://storage.azure.com/"}
testcommon.SetClientOptions(s.T(), &options.ClientOptions)

dirClient, err := directory.NewClient(dirURL, cred, options)
_require.NoError(err)

_, err = dirClient.Create(context.Background(), nil)
_require.NoError(err)

_, err = dirClient.GetProperties(context.Background(), nil)
_require.NoError(err)

}

func (s *RecordedTestSuite) TestCreateDirectoryClientCustomAudience() {
_require := require.New(s.T())
testName := s.T().Name()

filesystemName := testcommon.GenerateFileSystemName(testName)
fsClient, err := testcommon.GetFileSystemClient(filesystemName, s.T(), testcommon.TestAccountDatalake, nil)
_require.NoError(err)
defer testcommon.DeleteFileSystem(context.Background(), _require, fsClient)

_, err = fsClient.Create(context.Background(), nil)
_require.NoError(err)

cred, err := testcommon.GetGenericTokenCredential()
_require.NoError(err)

accountName, _ := testcommon.GetGenericAccountInfo(testcommon.TestAccountDatalake)
_require.Greater(len(accountName), 0)

dirName := testcommon.GenerateDirName(testName)
dirURL := "https://" + accountName + ".dfs.core.windows.net/" + filesystemName + "/" + dirName

options := &directory.ClientOptions{Audience: "https://" + accountName + ".blob.core.windows.net"}
testcommon.SetClientOptions(s.T(), &options.ClientOptions)

dirClient, err := directory.NewClient(dirURL, cred, options)
_require.NoError(err)

_, err = dirClient.Create(context.Background(), nil)
_require.NoError(err)

_, err = dirClient.GetProperties(context.Background(), nil)
_require.NoError(err)

}
42 changes: 32 additions & 10 deletions sdk/storage/azdatalake/file/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type Client base.CompositeClient[generated.PathClient, generated_blob.BlobClient
// - options - client options; pass nil to accept the default values
func NewClient(fileURL string, cred azcore.TokenCredential, options *ClientOptions) (*Client, error) {
blobURL, fileURL := shared.GetURLs(fileURL)
authPolicy := runtime.NewBearerTokenPolicy(cred, []string{shared.TokenScope}, nil)
audience := base.GetAudience((*base.ClientOptions)(options))
conOptions := shared.GetClientOptions(options)
authPolicy := shared.NewStorageChallengePolicy(cred, audience, conOptions.InsecureAllowCredentialWithHTTP)
plOpts := runtime.PipelineOptions{
PerRetry: []policy.Policy{authPolicy},
}
Expand Down Expand Up @@ -411,13 +412,6 @@ func (f *Client) AppendData(ctx context.Context, offset int64, body io.ReadSeekC
return AppendDataResponse{}, err
}
resp, err := f.generatedFileClientWithDFS().AppendData(ctx, body, appendDataOptions, nil, leaseAccessConditions, cpkInfo)
// TODO: check and uncomment this
//if err != nil {
// _, err1 := body.Seek(0, io.SeekStart)
// if err1 != nil {
// return AppendDataResponse{}, err1
// }
//}
return resp, exported.ConvertToDFSError(err)
}

Expand Down Expand Up @@ -451,6 +445,13 @@ func (f *Client) uploadFromReader(ctx context.Context, reader io.ReaderAt, actua
}
}

if o.EncryptionContext != nil {
_, err := f.Create(ctx, &CreateOptions{EncryptionContext: o.EncryptionContext})
if err != nil {
return err
}
}

progress := int64(0)
progressLock := &sync.Mutex{}

Expand Down Expand Up @@ -490,6 +491,12 @@ func (f *Client) uploadFromReader(ctx context.Context, reader io.ReaderAt, actua
})

if err != nil {
if o.EncryptionContext != nil {
_, err2 := f.Delete(ctx, nil)
if err2 != nil {
return exported.ConvertToDFSError(err2)
}
}
return exported.ConvertToDFSError(err)
}
// All appends were successful, call to flush
Expand Down Expand Up @@ -527,7 +534,20 @@ func (f *Client) UploadStream(ctx context.Context, body io.Reader, options *Uplo
options = &UploadStreamOptions{}
}

if options.EncryptionContext != nil {
_, err := f.Create(ctx, &CreateOptions{EncryptionContext: options.EncryptionContext})
if err != nil {
return err
}
}
err := copyFromReader(ctx, body, f, *options, newMMBPool)

if err != nil && options.EncryptionContext != nil {
_, err2 := f.Delete(ctx, nil)
if err2 != nil {
return exported.ConvertToDFSError(err2)
}
}
return exported.ConvertToDFSError(err)
}

Expand All @@ -538,8 +558,10 @@ func (f *Client) DownloadStream(ctx context.Context, o *DownloadStreamOptions) (
o = &DownloadStreamOptions{}
}
opts := o.format()
resp, err := f.blobClient().DownloadStream(ctx, opts)
newResp := FormatDownloadStreamResponse(&resp)
var respFromCtx *http.Response
ctxWithResp := shared.WithCaptureBlobResponse(ctx, &respFromCtx)
resp, err := f.blobClient().DownloadStream(ctxWithResp, opts)
newResp := FormatDownloadStreamResponse(&resp, respFromCtx)
fullResp := DownloadStreamResponse{
client: f,
DownloadResponse: newResp,
Expand Down
Loading