From c1d7978f48bf13d486764a8b28dc746f117b4aa6 Mon Sep 17 00:00:00 2001 From: Soulou Date: Tue, 23 Jul 2019 18:30:29 +0200 Subject: [PATCH 1/5] Add simple object storage package for s3 and swift --- Gopkg.lock | 48 ++++++++++++ storage/s3.go | 135 ++++++++++++++++++++++++++++++++ storage/s3_test.go | 104 ++++++++++++++++++++++++ storage/s3mock/s3client_mock.go | 71 +++++++++++++++++ storage/swift.go | 122 +++++++++++++++++++++++++++++ 5 files changed, 480 insertions(+) create mode 100644 storage/s3.go create mode 100644 storage/s3_test.go create mode 100644 storage/s3mock/s3client_mock.go create mode 100644 storage/swift.go diff --git a/Gopkg.lock b/Gopkg.lock index cfacb0ba..be8578fc 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -25,6 +25,32 @@ pruneopts = "" revision = "233adcd05fe2aa08c6f7449c26a5530726f05207" +[[projects]] + digest = "1:3f72590f47228b9bbad42ebfca58f61b4920d73b42a0280560645fffa3a454ec" + name = "github.com/aws/aws-sdk-go-v2" + packages = [ + "aws", + "aws/awserr", + "aws/defaults", + "aws/endpoints", + "aws/signer/v4", + "internal/awsutil", + "internal/sdk", + "private/protocol", + "private/protocol/query", + "private/protocol/query/queryutil", + "private/protocol/rest", + "private/protocol/restxml", + "private/protocol/xml", + "private/protocol/xml/xmlutil", + "service/s3", + "service/s3/s3iface", + "service/s3/s3manager", + ] + pruneopts = "" + revision = "098e15df3044cf1b04a222c1c33c3e6135ac89f3" + version = "v0.9.0" + [[projects]] digest = "1:e2d8cad4cf0c1b724c97f4cb1f01cbb9531bcc07b73c5ff3fe575cf6f8a2f75f" name = "github.com/coreos/etcd" @@ -137,6 +163,13 @@ revision = "89e084a80fb1e0bf5e7d38038e3367f821fdf3d7" version = "v1.5.3" +[[projects]] + digest = "1:13fe471d0ed891e8544eddfeeb0471fd3c9f2015609a1c000aefdedf52a19d40" + name = "github.com/jmespath/go-jmespath" + packages = ["."] + pruneopts = "" + revision = "c2b33e84" + [[projects]] branch = "master" digest = "1:448b4a6e39e46d8740b00dc871f26d58dc39341b160e01267b7917132831a136" @@ -145,6 +178,14 @@ pruneopts = "" revision = "b729f2633dfe35f4d1d8a32385f6685610ce1cb5" +[[projects]] + digest = "1:c8cbf7532bf2beace82ee1a180dc367f74a90f66b8653dc4443517711597a376" + name = "github.com/ncw/swift" + packages = ["."] + pruneopts = "" + revision = "24e3012fc8a71f004a6455bce2088031d50bf2b6" + version = "v1.0.47" + [[projects]] digest = "1:7a69f6a3a33929f8b66aa39c93868ad1698f06417fe627ae067559beb94504bd" name = "github.com/nsqio/go-nsq" @@ -339,12 +380,19 @@ input-imports = [ "github.com/Scalingo/go-etcd-cron", "github.com/Scalingo/logrus-rollbar", + "github.com/aws/aws-sdk-go-v2/aws", + "github.com/aws/aws-sdk-go-v2/aws/awserr", + "github.com/aws/aws-sdk-go-v2/aws/defaults", + "github.com/aws/aws-sdk-go-v2/aws/endpoints", + "github.com/aws/aws-sdk-go-v2/service/s3", + "github.com/aws/aws-sdk-go-v2/service/s3/s3manager", "github.com/coreos/etcd/clientv3", "github.com/coreos/etcd/pkg/transport", "github.com/facebookgo/grace/gracenet", "github.com/gofrs/uuid", "github.com/golang/mock/gomock", "github.com/influxdata/influxdb/client/v2", + "github.com/ncw/swift", "github.com/nsqio/go-nsq", "github.com/pkg/errors", "github.com/rollbar/rollbar-go", diff --git a/storage/s3.go b/storage/s3.go new file mode 100644 index 00000000..27c0c781 --- /dev/null +++ b/storage/s3.go @@ -0,0 +1,135 @@ +package storage + +import ( + "context" + "io" + "time" + + "github.com/Scalingo/go-utils/logger" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/awserr" + "github.com/aws/aws-sdk-go-v2/aws/defaults" + "github.com/aws/aws-sdk-go-v2/aws/endpoints" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/s3manager" + "github.com/pkg/errors" +) + +const ( + NotFoundErrCode = "NotFound" +) + +type S3Client interface { + GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRequest + HeadObjectRequest(input *s3.HeadObjectInput) s3.HeadObjectRequest + DeleteObjectRequest(input *s3.DeleteObjectInput) s3.DeleteObjectRequest +} + +type S3Config struct { + AK string + SK string + Region string + Endpoint string + Bucket string +} + +type S3 struct { + cfg S3Config + s3client S3Client + s3uploader *s3manager.Uploader + retryWaitDuration time.Duration + retryAttempts int +} + +func NewS3(cfg S3Config, bucket string) *S3 { + s3config := s3Config(cfg) + return &S3{ + cfg: cfg, s3client: s3.New(s3config), s3uploader: s3manager.NewUploader(s3config), + retryWaitDuration: time.Second, retryAttempts: 3, + } +} + +func (s *S3) Get(ctx context.Context, path string) (io.ReadCloser, error) { + log := logger.Get(ctx) + log.WithField("path", path).Info("Get object") + + input := &s3.GetObjectInput{ + Bucket: &s.cfg.Bucket, + Key: &path, + } + out, err := s.s3client.GetObjectRequest(input).Send(ctx) + if err != nil { + return nil, errors.Wrapf(err, "fail to get object %v", path) + } + return out.Body, nil +} + +func (s *S3) Upload(ctx context.Context, file io.Reader, path string) error { + input := &s3manager.UploadInput{ + Body: file, + Bucket: &s.cfg.Bucket, + Key: &path, + } + _, err := s.s3uploader.UploadWithContext(ctx, input) + if err != nil { + return errors.Wrapf(err, "fail to save file to %v", path) + } + + return nil +} + +// Size returns the size of the content of the object. A retry mecanism is +// implemented because of the eventual consistency of S3 backends NotFound +// error are sometimes returned when the object was just uploaded. +func (s *S3) Size(ctx context.Context, path string) (int64, error) { + var ( + err error + stat *s3.HeadObjectResponse + log = logger.Get(ctx) + input = &s3.HeadObjectInput{Bucket: &s.cfg.Bucket, Key: &path} + ) + for i := 0; i < s.retryAttempts; i++ { + stat, err = s.s3client.HeadObjectRequest(input).Send(ctx) + if err == nil { + return *stat.ContentLength, nil + } + + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == NotFoundErrCode { + log.WithField("key", path).Info("[s3] retry HEAD") + time.Sleep(s.retryWaitDuration) + continue + } + return -1, errors.Wrapf(err, "fail to HEAD object '%v'", path) + } + + return -1, errors.Wrapf(err, "fail to HEAD object '%v' after %v retries", path, s.retryAttempts) +} + +func (s *S3) Delete(ctx context.Context, path string) error { + input := &s3.DeleteObjectInput{Bucket: &s.cfg.Bucket, Key: &path} + req := s.s3client.DeleteObjectRequest(input) + _, err := req.Send(ctx) + if err != nil { + return errors.Wrapf(err, "fail to delete object %v", path) + } + + return nil +} + +func s3Config(cfg S3Config) aws.Config { + credentials := aws.NewStaticCredentialsProvider(cfg.AK, cfg.SK, "") + config := aws.Config{ + Region: cfg.Region, + Handlers: defaults.Handlers(), + HTTPClient: defaults.HTTPClient(), + Credentials: credentials, + EndpointResolver: aws.ResolveWithEndpoint(aws.Endpoint{ + URL: "https://" + cfg.Endpoint, + SigningRegion: cfg.Endpoint, + }), + } + if cfg.Endpoint == "" { + config.EndpointResolver = endpoints.NewDefaultResolver() + } + return config +} diff --git a/storage/s3_test.go b/storage/s3_test.go new file mode 100644 index 00000000..601ee5a5 --- /dev/null +++ b/storage/s3_test.go @@ -0,0 +1,104 @@ +package storage + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/Scalingo/go-utils/storage/s3mock" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Mock 404 NotFound error from AWS API +type KeyNotFoundErr struct{} + +func (err KeyNotFoundErr) Code() string { + return NotFoundErrCode +} + +func (err KeyNotFoundErr) Error() string { + return err.Code() +} + +func (err KeyNotFoundErr) Message() string { + return err.Error() +} + +func (err KeyNotFoundErr) OrigErr() error { + return err +} + +func TestS3_Size(t *testing.T) { + cases := map[string]struct { + expectMock func(t *testing.T, m *s3mock.MockS3Client) + err string + }{ + "it should make a HEAD request on the object": { + expectMock: func(t *testing.T, m *s3mock.MockS3Client) { + m.EXPECT().HeadObjectRequest(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), Key: aws.String("/key"), + }).Return(s3.HeadObjectRequest{Request: &aws.Request{ + // Mandatory to create an empty request, otherwise it panics + HTTPRequest: new(http.Request), + Data: &s3.HeadObjectOutput{ContentLength: aws.Int64(10)}, + }}) + }, + }, + "it should retry if the first HEAD request return 404": { + expectMock: func(t *testing.T, m *s3mock.MockS3Client) { + m.EXPECT().HeadObjectRequest(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), Key: aws.String("/key"), + }).Return(s3.HeadObjectRequest{Request: &aws.Request{ + HTTPRequest: new(http.Request), + Error: KeyNotFoundErr{}, + }}) + + m.EXPECT().HeadObjectRequest(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), Key: aws.String("/key"), + }).Return(s3.HeadObjectRequest{Request: &aws.Request{ + // Mandatory to create an empty request, otherwise it panics + HTTPRequest: new(http.Request), + Data: &s3.HeadObjectOutput{ContentLength: aws.Int64(10)}, + }}) + }, + }, + "it should fail if the max amount of retried is passed": { + expectMock: func(t *testing.T, m *s3mock.MockS3Client) { + m.EXPECT().HeadObjectRequest(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), Key: aws.String("/key"), + }).Return(s3.HeadObjectRequest{Request: &aws.Request{ + HTTPRequest: new(http.Request), + Error: KeyNotFoundErr{}, + }}).Times(3) + }, + err: "NotFound", + }, + } + for title, c := range cases { + t.Run(title, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mock := s3mock.NewMockS3Client(ctrl) + storage := &S3{ + cfg: S3Config{Bucket: "bucket"}, + s3client: mock, + retryAttempts: 3, retryWaitDuration: 50 * time.Millisecond, + } + + c.expectMock(t, mock) + _, err := storage.Size(context.Background(), "/key") + if c.err != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), c.err) + return + } + require.NoError(t, err) + }) + } +} diff --git a/storage/s3mock/s3client_mock.go b/storage/s3mock/s3client_mock.go new file mode 100644 index 00000000..3625372b --- /dev/null +++ b/storage/s3mock/s3client_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/Scalingo/appsdeck-database/controller/backup/storage/s3 (interfaces: S3Client) + +// Package s3mock is a generated GoMock package. +package s3mock + +import ( + reflect "reflect" + + s3 "github.com/aws/aws-sdk-go-v2/service/s3" + gomock "github.com/golang/mock/gomock" +) + +// MockS3Client is a mock of S3Client interface +type MockS3Client struct { + ctrl *gomock.Controller + recorder *MockS3ClientMockRecorder +} + +// MockS3ClientMockRecorder is the mock recorder for MockS3Client +type MockS3ClientMockRecorder struct { + mock *MockS3Client +} + +// NewMockS3Client creates a new mock instance +func NewMockS3Client(ctrl *gomock.Controller) *MockS3Client { + mock := &MockS3Client{ctrl: ctrl} + mock.recorder = &MockS3ClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockS3Client) EXPECT() *MockS3ClientMockRecorder { + return m.recorder +} + +// DeleteObjectRequest mocks base method +func (m *MockS3Client) DeleteObjectRequest(arg0 *s3.DeleteObjectInput) s3.DeleteObjectRequest { + ret := m.ctrl.Call(m, "DeleteObjectRequest", arg0) + ret0, _ := ret[0].(s3.DeleteObjectRequest) + return ret0 +} + +// DeleteObjectRequest indicates an expected call of DeleteObjectRequest +func (mr *MockS3ClientMockRecorder) DeleteObjectRequest(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObjectRequest", reflect.TypeOf((*MockS3Client)(nil).DeleteObjectRequest), arg0) +} + +// GetObjectRequest mocks base method +func (m *MockS3Client) GetObjectRequest(arg0 *s3.GetObjectInput) s3.GetObjectRequest { + ret := m.ctrl.Call(m, "GetObjectRequest", arg0) + ret0, _ := ret[0].(s3.GetObjectRequest) + return ret0 +} + +// GetObjectRequest indicates an expected call of GetObjectRequest +func (mr *MockS3ClientMockRecorder) GetObjectRequest(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectRequest", reflect.TypeOf((*MockS3Client)(nil).GetObjectRequest), arg0) +} + +// HeadObjectRequest mocks base method +func (m *MockS3Client) HeadObjectRequest(arg0 *s3.HeadObjectInput) s3.HeadObjectRequest { + ret := m.ctrl.Call(m, "HeadObjectRequest", arg0) + ret0, _ := ret[0].(s3.HeadObjectRequest) + return ret0 +} + +// HeadObjectRequest indicates an expected call of HeadObjectRequest +func (mr *MockS3ClientMockRecorder) HeadObjectRequest(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadObjectRequest", reflect.TypeOf((*MockS3Client)(nil).HeadObjectRequest), arg0) +} diff --git a/storage/swift.go b/storage/swift.go new file mode 100644 index 00000000..51a895d9 --- /dev/null +++ b/storage/swift.go @@ -0,0 +1,122 @@ +package storage + +import ( + "context" + "crypto/rand" + "crypto/sha1" + "encoding/hex" + "io" + "strings" + + "github.com/Scalingo/go-utils/logger" + "github.com/ncw/swift" + "github.com/pkg/errors" +) + +const contentType = "application/octet-stream" + +type SwiftConfig struct { + Username string + Password string + AuthURL string + Region string + Tenant string + Prefix string + Container string + ChunkSize int64 +} + +type Swift struct { + cfg SwiftConfig + conn *swift.Connection +} + +func NewSwift(cfg SwiftConfig) *Swift { + conn := &swift.Connection{ + UserName: cfg.Username, + ApiKey: cfg.Password, + AuthUrl: cfg.AuthURL, + Region: cfg.Region, + Tenant: cfg.Tenant, + } + return &Swift{cfg: cfg, conn: conn} +} + +func (s *Swift) Get(ctx context.Context, path string) (io.ReadCloser, error) { + path = s.fullPath(path) + log := logger.Get(ctx) + log.WithField("path", path).Info("Get object") + + object, _, err := s.conn.ObjectOpen(s.cfg.Container, path, false, swift.Headers{}) + if err != nil { + return nil, errors.Wrapf(err, "fail to get object %v", path) + } + return object, nil +} + +func (s *Swift) Upload(ctx context.Context, reader io.Reader, path string) error { + path = s.fullPath(path) + segmentPath, err := s.segmentPath(path) + if err != nil { + return errors.Wrapf(err, "fail to generate segment path") + } + object, err := s.conn.DynamicLargeObjectCreateFile(&swift.LargeObjectOpts{ + ObjectName: path, + ContentType: contentType, + Container: s.cfg.Container, + SegmentContainer: s.cfg.Container, + SegmentPrefix: segmentPath, + ChunkSize: s.cfg.ChunkSize, + }) + if err != nil { + return errors.Wrapf(err, "fail to create a dynamic large object %v", path) + } + defer object.Close() + + _, err = io.Copy(object, reader) + if err != nil { + return errors.Wrapf(err, "fail to upload content of object %v", path) + } + + err = object.Flush() + if err != nil { + return errors.Wrapf(err, "fail to flush object %v", path) + } + + return nil +} + +// Size returns the size of the content of the object. A retry mecanism is +// implemented because of the eventual consistency of Swift backends NotFound +// error are sometimes returned when the object was just uploaded. +func (s *Swift) Size(ctx context.Context, path string) (int64, error) { + path = s.fullPath(path) + info, _, err := s.conn.Object(s.cfg.Container, path) + if err != nil { + return -1, errors.Wrapf(err, "fail to get object info %v", path) + } + return info.Bytes, nil +} + +func (s *Swift) Delete(ctx context.Context, path string) error { + path = s.fullPath(path) + err := s.conn.DynamicLargeObjectDelete(s.cfg.Container, path) + if err != nil { + return errors.Wrapf(err, "fail to delete object %v", path) + } + return nil +} + +func (s *Swift) segmentPath(path string) (string, error) { + checksum := sha1.New() + random := make([]byte, 32) + if _, err := rand.Read(random); err != nil { + return "", err + } + path = hex.EncodeToString(checksum.Sum(append([]byte(path), random...))) + return strings.TrimLeft(strings.TrimRight(s.cfg.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil +} + +func (s *Swift) fullPath(path string) string { + return strings.TrimLeft(s.cfg.Prefix+"/"+path, "/") +} From 8478334c976f60176e4aec9d15e02b60b0eaf7e3 Mon Sep 17 00:00:00 2001 From: Soulou Date: Thu, 15 Aug 2019 20:11:42 +0200 Subject: [PATCH 2/5] Use gomock with gomock_generator to create mock, add mock on storage.Backend --- mocks.json | 12 +++ mocks_sig.json | 1 + .../{gomock_producer.go => producer_mock.go} | 9 +- storage/backend.go | 18 ++++ storage/storagemock/backend_mock.go | 94 +++++++++++++++++++ 5 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 mocks.json create mode 100644 mocks_sig.json rename nsqproducer/nsqproducermock/{gomock_producer.go => producer_mock.go} (94%) create mode 100644 storage/backend.go create mode 100644 storage/storagemock/backend_mock.go diff --git a/mocks.json b/mocks.json new file mode 100644 index 00000000..7cf29f85 --- /dev/null +++ b/mocks.json @@ -0,0 +1,12 @@ +{ + "base_package": "github.com/Scalingo/go-utils", + "mocks": [ + { + "interface": "Backend", + "src_package": "storage" + }, { + "interface": "Producer", + "src_package": "nsqproducer" + } + ] +} diff --git a/mocks_sig.json b/mocks_sig.json new file mode 100644 index 00000000..a97415d8 --- /dev/null +++ b/mocks_sig.json @@ -0,0 +1 @@ +{"github.com/Scalingo/go-utils/nsqproducer.Producer":"33 c7 9b 68 1a 5f 11 fc 4f cb 66 83 92 27 61 b9 c4 ec 87 7b","github.com/Scalingo/go-utils/storage.Backend":"NOFILE"} \ No newline at end of file diff --git a/nsqproducer/nsqproducermock/gomock_producer.go b/nsqproducer/nsqproducermock/producer_mock.go similarity index 94% rename from nsqproducer/nsqproducermock/gomock_producer.go rename to nsqproducer/nsqproducermock/producer_mock.go index e74110de..c3e60ca1 100644 --- a/nsqproducer/nsqproducermock/gomock_producer.go +++ b/nsqproducer/nsqproducermock/producer_mock.go @@ -6,9 +6,10 @@ package nsqproducermock import ( context "context" + reflect "reflect" + nsqproducer "github.com/Scalingo/go-utils/nsqproducer" gomock "github.com/golang/mock/gomock" - reflect "reflect" ) // MockProducer is a mock of Producer interface @@ -36,6 +37,7 @@ func (m *MockProducer) EXPECT() *MockProducerMockRecorder { // DeferredPublish mocks base method func (m *MockProducer) DeferredPublish(arg0 context.Context, arg1 string, arg2 int64, arg3 nsqproducer.NsqMessageSerialize) error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeferredPublish", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 @@ -43,11 +45,13 @@ func (m *MockProducer) DeferredPublish(arg0 context.Context, arg1 string, arg2 i // DeferredPublish indicates an expected call of DeferredPublish func (mr *MockProducerMockRecorder) DeferredPublish(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeferredPublish", reflect.TypeOf((*MockProducer)(nil).DeferredPublish), arg0, arg1, arg2, arg3) } // Publish mocks base method func (m *MockProducer) Publish(arg0 context.Context, arg1 string, arg2 nsqproducer.NsqMessageSerialize) error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Publish", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 @@ -55,15 +59,18 @@ func (m *MockProducer) Publish(arg0 context.Context, arg1 string, arg2 nsqproduc // Publish indicates an expected call of Publish func (mr *MockProducerMockRecorder) Publish(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockProducer)(nil).Publish), arg0, arg1, arg2) } // Stop mocks base method func (m *MockProducer) Stop() { + m.ctrl.T.Helper() m.ctrl.Call(m, "Stop") } // Stop indicates an expected call of Stop func (mr *MockProducerMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockProducer)(nil).Stop)) } diff --git a/storage/backend.go b/storage/backend.go new file mode 100644 index 00000000..cff10454 --- /dev/null +++ b/storage/backend.go @@ -0,0 +1,18 @@ +package storage + +import ( + "context" + "io" +) + +// Backend represents something which is able to store files on an object +// storage service +type Backend interface { + Get(ctx context.Context, path string) (io.ReadCloser, error) + Upload(ctx context.Context, file io.Reader, path string) error + Size(ctx context.Context, path string) (int64, error) + Delete(ctx context.Context, path string) error +} + +var _ Backend = &S3{} +var _ Backend = &Swift{} diff --git a/storage/storagemock/backend_mock.go b/storage/storagemock/backend_mock.go new file mode 100644 index 00000000..e75284e3 --- /dev/null +++ b/storage/storagemock/backend_mock.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/Scalingo/go-utils/storage (interfaces: Backend) + +// Package storagemock is a generated GoMock package. +package storagemock + +import ( + context "context" + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockBackend is a mock of Backend interface +type MockBackend struct { + ctrl *gomock.Controller + recorder *MockBackendMockRecorder +} + +// MockBackendMockRecorder is the mock recorder for MockBackend +type MockBackendMockRecorder struct { + mock *MockBackend +} + +// NewMockBackend creates a new mock instance +func NewMockBackend(ctrl *gomock.Controller) *MockBackend { + mock := &MockBackend{ctrl: ctrl} + mock.recorder = &MockBackendMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockBackend) EXPECT() *MockBackendMockRecorder { + return m.recorder +} + +// Delete mocks base method +func (m *MockBackend) Delete(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete +func (mr *MockBackendMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockBackend)(nil).Delete), arg0, arg1) +} + +// Get mocks base method +func (m *MockBackend) Get(arg0 context.Context, arg1 string) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0, arg1) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get +func (mr *MockBackendMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockBackend)(nil).Get), arg0, arg1) +} + +// Size mocks base method +func (m *MockBackend) Size(arg0 context.Context, arg1 string) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Size indicates an expected call of Size +func (mr *MockBackendMockRecorder) Size(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockBackend)(nil).Size), arg0, arg1) +} + +// Upload mocks base method +func (m *MockBackend) Upload(arg0 context.Context, arg1 io.Reader, arg2 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Upload", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Upload indicates an expected call of Upload +func (mr *MockBackendMockRecorder) Upload(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upload", reflect.TypeOf((*MockBackend)(nil).Upload), arg0, arg1, arg2) +} From 06b7ce27132cdc0d2338df64de9526442de5b60e Mon Sep 17 00:00:00 2001 From: Soulou Date: Thu, 15 Aug 2019 20:13:50 +0200 Subject: [PATCH 3/5] Update github.com/golang/mock --- Gopkg.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index be8578fc..f05e6da2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -114,12 +114,12 @@ version = "v1.0.0" [[projects]] - digest = "1:a1bad350477afbc84e8cbe5c78be4579478c55335377239631ff0adb985fbabc" + digest = "1:68c64bb61d55dcd17c82ca0b871ddddb5ae18b30cfe26f6bfd4b6df6287dc2e0" name = "github.com/golang/mock" packages = ["gomock"] pruneopts = "" - revision = "13f360950a79f5864a972c786a10a50e44b69541" - version = "v1.0.0" + revision = "9fa652df1129bef0e734c9cf9bf6dbae9ef3b9fa" + version = "1.3.1" [[projects]] digest = "1:bcb38c8fc9b21bb8682ce2d605a7d4aeb618abc7f827e3ac0b27c0371fdb23fb" From 133c9c26553d18b385ca03eaa0001aed4cfb77dc Mon Sep 17 00:00:00 2001 From: Soulou Date: Fri, 16 Aug 2019 10:06:42 +0200 Subject: [PATCH 4/5] Add configurability for the retry policy --- storage/backend.go | 10 +++++ storage/s3.go | 101 +++++++++++++++++++++++++++++++++------------ 2 files changed, 84 insertions(+), 27 deletions(-) diff --git a/storage/backend.go b/storage/backend.go index cff10454..5398e3cd 100644 --- a/storage/backend.go +++ b/storage/backend.go @@ -5,6 +5,16 @@ import ( "io" ) +// BackendMethod represents the name of a Method included in the Backend interface +type BackendMethod string + +const ( + GetMethod BackendMethod = "Get" + UploadMethod BackendMethod = "Upload" + SizeMethod BackendMethod = "Size" + DeleteMethod BackendMethod = "Delete" +) + // Backend represents something which is able to store files on an object // storage service type Backend interface { diff --git a/storage/s3.go b/storage/s3.go index 27c0c781..bb3b2fae 100644 --- a/storage/s3.go +++ b/storage/s3.go @@ -33,20 +33,45 @@ type S3Config struct { Bucket string } +type RetryPolicy struct { + WaitDuration time.Duration + Attempts int + MethodHandlers map[BackendMethod][]string +} + type S3 struct { - cfg S3Config - s3client S3Client - s3uploader *s3manager.Uploader - retryWaitDuration time.Duration - retryAttempts int + cfg S3Config + s3client S3Client + s3uploader *s3manager.Uploader + retryPolicy RetryPolicy +} + +type s3Opt func(s3 *S3) + +// WithRetryPolicy is an option to constructor NewS3 to add a Retry Policy +// impacting GET operations +func WithRetryPolicy(policy RetryPolicy) s3Opt { + return s3Opt(func(s3 *S3) { + s3.retryPolicy = policy + }) } -func NewS3(cfg S3Config, bucket string) *S3 { +func NewS3(cfg S3Config, opts ...s3Opt) *S3 { s3config := s3Config(cfg) - return &S3{ + s3 := &S3{ cfg: cfg, s3client: s3.New(s3config), s3uploader: s3manager.NewUploader(s3config), - retryWaitDuration: time.Second, retryAttempts: 3, + retryPolicy: RetryPolicy{ + WaitDuration: time.Second, + Attempts: 3, + MethodHandlers: map[BackendMethod][]string{ + SizeMethod: []string{NotFoundErrCode}, + }, + }, } + for _, opt := range opts { + opt(s3) + } + return s3 } func (s *S3) Get(ctx context.Context, path string) (io.ReadCloser, error) { @@ -82,27 +107,21 @@ func (s *S3) Upload(ctx context.Context, file io.Reader, path string) error { // implemented because of the eventual consistency of S3 backends NotFound // error are sometimes returned when the object was just uploaded. func (s *S3) Size(ctx context.Context, path string) (int64, error) { - var ( - err error - stat *s3.HeadObjectResponse - log = logger.Get(ctx) - input = &s3.HeadObjectInput{Bucket: &s.cfg.Bucket, Key: &path} - ) - for i := 0; i < s.retryAttempts; i++ { - stat, err = s.s3client.HeadObjectRequest(input).Send(ctx) - if err == nil { - return *stat.ContentLength, nil + var res int64 + err := s.retryWrapper(ctx, SizeMethod, func(ctx context.Context) error { + log := logger.Get(ctx).WithField("key", path) + log.Infof("[s3] Size()") + + input := &s3.HeadObjectInput{Bucket: &s.cfg.Bucket, Key: &path} + stat, err := s.s3client.HeadObjectRequest(input).Send(ctx) + if err != nil { + return err } + res = *stat.ContentLength + return nil + }) - if aerr, ok := err.(awserr.Error); ok && aerr.Code() == NotFoundErrCode { - log.WithField("key", path).Info("[s3] retry HEAD") - time.Sleep(s.retryWaitDuration) - continue - } - return -1, errors.Wrapf(err, "fail to HEAD object '%v'", path) - } - - return -1, errors.Wrapf(err, "fail to HEAD object '%v' after %v retries", path, s.retryAttempts) + return -1, errors.Wrapf(err, "fail to HEAD object '%v'", path) } func (s *S3) Delete(ctx context.Context, path string) error { @@ -116,6 +135,33 @@ func (s *S3) Delete(ctx context.Context, path string) error { return nil } +func (s *S3) retryWrapper(ctx context.Context, method BackendMethod, fun func(ctx context.Context) error) error { + var err error + + errorCodes := s.retryPolicy.MethodHandlers[method] + // no-op is no retry policy on the method + if errorCodes == nil { + return fun(ctx) + } + for i := 0; i < s.retryPolicy.Attempts; i++ { + log := logger.Get(ctx).WithField("attempt", i+1) + ctx := logger.ToCtx(ctx, log) + err = fun(ctx) + if err == nil { + return nil + } + if aerr, ok := err.(awserr.Error); ok { + for _, code := range errorCodes { + if aerr.Code() == code { + time.Sleep(s.retryPolicy.WaitDuration) + return err + } + } + } + } + return err +} + func s3Config(cfg S3Config) aws.Config { credentials := aws.NewStaticCredentialsProvider(cfg.AK, cfg.SK, "") config := aws.Config{ @@ -131,5 +177,6 @@ func s3Config(cfg S3Config) aws.Config { if cfg.Endpoint == "" { config.EndpointResolver = endpoints.NewDefaultResolver() } + return config } From e16d54fc8daf68104314b89556235992e7226cba Mon Sep 17 00:00:00 2001 From: Soulou Date: Fri, 16 Aug 2019 10:17:40 +0200 Subject: [PATCH 5/5] Fix specs --- storage/s3.go | 6 ++++-- storage/s3_test.go | 9 ++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/storage/s3.go b/storage/s3.go index bb3b2fae..a8403d80 100644 --- a/storage/s3.go +++ b/storage/s3.go @@ -121,7 +121,10 @@ func (s *S3) Size(ctx context.Context, path string) (int64, error) { return nil }) - return -1, errors.Wrapf(err, "fail to HEAD object '%v'", path) + if err != nil { + return -1, errors.Wrapf(err, "fail to HEAD object '%v'", path) + } + return res, nil } func (s *S3) Delete(ctx context.Context, path string) error { @@ -154,7 +157,6 @@ func (s *S3) retryWrapper(ctx context.Context, method BackendMethod, fun func(ct for _, code := range errorCodes { if aerr.Code() == code { time.Sleep(s.retryPolicy.WaitDuration) - return err } } } diff --git a/storage/s3_test.go b/storage/s3_test.go index 601ee5a5..99bc5b82 100644 --- a/storage/s3_test.go +++ b/storage/s3_test.go @@ -86,9 +86,12 @@ func TestS3_Size(t *testing.T) { mock := s3mock.NewMockS3Client(ctrl) storage := &S3{ - cfg: S3Config{Bucket: "bucket"}, - s3client: mock, - retryAttempts: 3, retryWaitDuration: 50 * time.Millisecond, + cfg: S3Config{Bucket: "bucket"}, + s3client: mock, + retryPolicy: RetryPolicy{ + Attempts: 3, WaitDuration: 50 * time.Millisecond, + MethodHandlers: map[BackendMethod][]string{SizeMethod: []string{NotFoundErrCode}}, + }, } c.expectMock(t, mock)