Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/containers/azcontainerregistry/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "go",
"TagPrefix": "go/containers/azcontainerregistry",
"Tag": "go/containers/azcontainerregistry_5bce238ccf"
"Tag": "go/containers/azcontainerregistry_9579d04096"
}
74 changes: 58 additions & 16 deletions sdk/containers/azcontainerregistry/blob_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@ package azcontainerregistry
import (
"bytes"
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
"io"
"net/http"
"strconv"
"strings"
"testing"
)

const alpineBlobDigest = "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"

func TestBlobClient_CancelUpload(t *testing.T) {
startRecording(t)
endpoint, cred, options := getEndpointCredAndClientOptions(t)
Expand Down Expand Up @@ -44,10 +52,9 @@ func TestBlobClient_CheckBlobExists(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
res, err := client.CheckBlobExists(ctx, "alpine", digest, nil)
res, err := client.CheckBlobExists(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
require.Equal(t, digest, *res.DockerContentDigest)
require.Equal(t, alpineBlobDigest, *res.DockerContentDigest)
}

func TestBlobClient_CheckBlobExists_fail(t *testing.T) {
Expand Down Expand Up @@ -76,8 +83,7 @@ func TestBlobClient_CheckChunkExists(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
res, err := client.CheckChunkExists(ctx, "alpine", digest, "bytes=0-299", nil)
res, err := client.CheckChunkExists(ctx, "alpine", alpineBlobDigest, "bytes=0-299", nil)
require.NoError(t, err)
require.NotEmpty(t, *res.ContentLength)
}
Expand Down Expand Up @@ -108,8 +114,7 @@ func TestBlobClient_completeUpload_wrongDigest(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
blob, err := io.ReadAll(getRes.BlobData)
require.NoError(t, err)
Expand All @@ -127,8 +132,7 @@ func TestBlobClient_DeleteBlob(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
_, err = client.DeleteBlob(ctx, "alpine", digest, nil)
_, err = client.DeleteBlob(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
}

Expand Down Expand Up @@ -158,10 +162,32 @@ func TestBlobClient_GetBlob(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
res, err := client.GetBlob(ctx, "alpine", digest, nil)
res, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
require.NotEmpty(t, *res.ContentLength)
reader, err := NewDigestValidationReader(alpineBlobDigest, res.BlobData)
require.NoError(t, err)
_, err = io.ReadAll(reader)
require.NoError(t, err)
}

func TestBlobClient_GetBlob_wrongDigest(t *testing.T) {
srv, closeServer := mock.NewServer()
defer closeServer()
srv.AppendResponse(mock.WithStatusCode(http.StatusOK), mock.WithBody([]byte("test")))

pl := runtime.NewPipeline(moduleName, moduleVersion, runtime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &BlobClient{
srv.URL(),
pl,
}
ctx := context.Background()
resp, err := client.GetBlob(ctx, "name", "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08", nil)
require.NoError(t, err)
reader, err := NewDigestValidationReader("sha256:wrong", resp.BlobData)
require.NoError(t, err)
_, err = io.ReadAll(reader)
require.Error(t, err, ErrMismatchedHash)
}

func TestBlobClient_GetBlob_fail(t *testing.T) {
Expand Down Expand Up @@ -190,10 +216,27 @@ func TestBlobClient_GetChunk(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
res, err := client.GetChunk(ctx, "alpine", digest, "bytes=0-999", nil)
chunkSize := 1000
current := 0
blob := bytes.NewBuffer(nil)
for {
res, err := client.GetChunk(ctx, "alpine", alpineBlobDigest, fmt.Sprintf("bytes=%d-%d", current, current+chunkSize-1), nil)
require.NoError(t, err)
chunk, err := io.ReadAll(res.ChunkData)
require.NoError(t, err)
_, err = blob.Write(chunk)
require.NoError(t, err)
totalSize, _ := strconv.Atoi(strings.Split(*res.ContentRange, "/")[1])
currentRangeEnd, _ := strconv.Atoi(strings.Split(strings.Split(*res.ContentRange, "/")[0], "-")[1])
if totalSize == currentRangeEnd+1 {
break
}
current += chunkSize
}
reader, err := NewDigestValidationReader(alpineBlobDigest, blob)
require.NoError(t, err)
_, err = io.ReadAll(reader)
require.NoError(t, err)
require.Equal(t, int64(1000), *res.ContentLength)
}

func TestBlobClient_GetChunk_fail(t *testing.T) {
Expand Down Expand Up @@ -247,8 +290,7 @@ func TestBlobClient_MountBlob(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
res, err := client.MountBlob(ctx, "hello-world", "alpine", digest, nil)
res, err := client.MountBlob(ctx, "hello-world", "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
require.NotEmpty(t, res.Location)
}
Expand Down
66 changes: 3 additions & 63 deletions sdk/containers/azcontainerregistry/blob_custom_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ package azcontainerregistry

import (
"context"
"crypto/sha256"
"encoding"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"hash"
"io"
"reflect"
)
Expand Down Expand Up @@ -61,59 +58,6 @@ func NewBlobClient(endpoint string, credential azcore.TokenCredential, options *
}, nil
}

// BlobDigestCalculator help to calculate blob digest when uploading blob.
// Don't use this type directly, use NewBlobDigestCalculator() instead.
type BlobDigestCalculator struct {
h hash.Hash
hashState []byte
}

type wrappedReadSeeker struct {
io.Reader
io.Seeker
}

// NewBlobDigestCalculator creates a new calculator to help to calculate blob digest when uploading blob.
func NewBlobDigestCalculator() *BlobDigestCalculator {
return &BlobDigestCalculator{
h: sha256.New(),
}
}

func (b *BlobDigestCalculator) saveState() {
b.hashState, _ = b.h.(encoding.BinaryMarshaler).MarshalBinary()
}

func (b *BlobDigestCalculator) restoreState() {
if b.hashState == nil {
return
}
_ = b.h.(encoding.BinaryUnmarshaler).UnmarshalBinary(b.hashState)
}

// newLimitTeeReader returns a Reader that writes to w what it reads from r with n bytes limit.
func newLimitTeeReader(r io.Reader, w io.Writer, n int64) io.Reader {
return &limitTeeReader{r, w, n}
}

type limitTeeReader struct {
r io.Reader
w io.Writer
n int64
}

func (lt *limitTeeReader) Read(p []byte) (int, error) {
n, err := lt.r.Read(p)
if n > 0 && lt.n > 0 {
wn, werr := lt.w.Write(p[:n])
if werr != nil {
return wn, werr
}
lt.n -= int64(wn)
}
return n, err
}

// BlobClientUploadChunkOptions contains the optional parameters for the BlobClient.UploadChunk method.
type BlobClientUploadChunkOptions struct {
// Start of range for the blob to be uploaded.
Expand All @@ -130,15 +74,11 @@ type BlobClientUploadChunkOptions struct {
// - options - BlobClientUploadChunkOptions contains the optional parameters for the BlobClient.UploadChunk method.
func (client *BlobClient) UploadChunk(ctx context.Context, location string, chunkData io.ReadSeeker, blobDigestCalculator *BlobDigestCalculator, options *BlobClientUploadChunkOptions) (BlobClientUploadChunkResponse, error) {
blobDigestCalculator.saveState()
size, err := chunkData.Seek(0, io.SeekEnd) // Seek to the end to get the stream's size
if err != nil {
return BlobClientUploadChunkResponse{}, err
}
_, err = chunkData.Seek(0, io.SeekStart)
reader, err := blobDigestCalculator.wrapReader(chunkData)
if err != nil {
return BlobClientUploadChunkResponse{}, err
}
wrappedChunkData := &wrappedReadSeeker{Reader: newLimitTeeReader(chunkData, blobDigestCalculator.h, size), Seeker: chunkData}
wrappedChunkData := &wrappedReadSeeker{Reader: reader, Seeker: chunkData}
var requestOptions *blobClientUploadChunkOptions
if options != nil && options.RangeStart != nil && options.RangeEnd != nil {
requestOptions = &blobClientUploadChunkOptions{ContentRange: to.Ptr(fmt.Sprintf("%d-%d", *options.RangeStart, *options.RangeEnd))}
Expand All @@ -157,5 +97,5 @@ func (client *BlobClient) UploadChunk(ctx context.Context, location string, chun
// - blobDigestCalculator - Calculator that help to calculate blob digest
// - options - BlobClientCompleteUploadOptions contains the optional parameters for the BlobClient.CompleteUpload method.
func (client *BlobClient) CompleteUpload(ctx context.Context, location string, blobDigestCalculator *BlobDigestCalculator, options *BlobClientCompleteUploadOptions) (BlobClientCompleteUploadResponse, error) {
return client.completeUpload(ctx, fmt.Sprintf("sha256:%x", blobDigestCalculator.h.Sum(nil)), location, options)
return client.completeUpload(ctx, blobDigestCalculator.getDigest(), location, options)
}
55 changes: 24 additions & 31 deletions sdk/containers/azcontainerregistry/blob_custom_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ func TestBlobClient_CompleteUpload(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
blob, err := io.ReadAll(getRes.BlobData)
require.NoError(t, err)
Expand All @@ -49,8 +48,7 @@ func TestBlobClient_UploadChunk(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
blob, err := io.ReadAll(getRes.BlobData)
require.NoError(t, err)
Expand All @@ -70,24 +68,34 @@ func TestBlobClient_CompleteUpload_uploadByChunk(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
blob, err := io.ReadAll(getRes.BlobData)
require.NoError(t, err)
startRes, err := client.StartUpload(ctx, "hello-world", nil)
require.NoError(t, err)
calculator := NewBlobDigestCalculator()
oriReader := bytes.NewReader(blob)
firstPart := io.NewSectionReader(oriReader, int64(0), int64(len(blob)/2))
secondPart := io.NewSectionReader(oriReader, int64(len(blob)/2), int64(len(blob)-len(blob)/2))
uploadResp, err := client.UploadChunk(ctx, *startRes.Location, firstPart, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(0)), RangeEnd: to.Ptr(int32(len(blob)/2 - 1))})
require.NoError(t, err)
require.NotEmpty(t, *uploadResp.Location)
uploadResp, err = client.UploadChunk(ctx, *uploadResp.Location, secondPart, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(len(blob) / 2)), RangeEnd: to.Ptr(int32(len(blob) - 1))})
require.NoError(t, err)
require.NotEmpty(t, *uploadResp.Location)
completeResp, err := client.CompleteUpload(ctx, *uploadResp.Location, calculator, nil)
size := int64(len(blob))
chunkSize := int64(736)
current := int64(0)
location := *startRes.Location
for {
end := current + chunkSize
if end > size {
end = size
}
chunkReader := io.NewSectionReader(oriReader, current, end-current)
uploadResp, err := client.UploadChunk(ctx, location, chunkReader, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(current)), RangeEnd: to.Ptr(int32(end - 1))})
require.NoError(t, err)
require.NotEmpty(t, *uploadResp.Location)
location = *uploadResp.Location
current = end
if current >= size {
break
}
}
completeResp, err := client.CompleteUpload(ctx, location, calculator, nil)
require.NoError(t, err)
require.NotEmpty(t, *completeResp.DockerContentDigest)
}
Expand All @@ -103,28 +111,13 @@ func TestNewBlobClient(t *testing.T) {
require.Errorf(t, err, "provided Cloud field is missing Azure Container Registry configuration")
}

func TestBlobDigestCalculator_saveAndRestoreState(t *testing.T) {
calculator := NewBlobDigestCalculator()
calculator.restoreState()
calculator.saveState()
calculator.restoreState()
calculator.h.Write([]byte("test1"))
sum := calculator.h.Sum(nil)
calculator.saveState()
calculator.h.Write([]byte("test2"))
require.NotEqual(t, sum, calculator.h.Sum(nil))
calculator.restoreState()
require.Equal(t, sum, calculator.h.Sum(nil))
}

func TestBlobClient_CompleteUpload_uploadByChunkFailOver(t *testing.T) {
startRecording(t)
endpoint, cred, options := getEndpointCredAndClientOptions(t)
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
require.NoError(t, err)
blob, err := io.ReadAll(getRes.BlobData)
require.NoError(t, err)
Expand Down
Loading