diff --git a/fakestorage/json_response.go b/fakestorage/json_response.go index b0f2f429dc..731d806bcb 100644 --- a/fakestorage/json_response.go +++ b/fakestorage/json_response.go @@ -6,6 +6,8 @@ import ( "net/http" "os" "syscall" + + "github.com/fsouza/fake-gcs-server/internal/backend" ) type jsonResponse struct { @@ -63,5 +65,8 @@ func errToJsonResponse(err error) jsonResponse { if errors.As(err, &pathError) && pathError.Err == syscall.ENAMETOOLONG { status = http.StatusBadRequest } + if err == backend.PreConditionFailed { + status = http.StatusPreconditionFailed + } return jsonResponse{errorMessage: err.Error(), status: status} } diff --git a/fakestorage/object.go b/fakestorage/object.go index 8a251a870c..5c124b56b0 100644 --- a/fakestorage/object.go +++ b/fakestorage/object.go @@ -289,7 +289,7 @@ func (s *Server) CreateObject(obj Object) { // If the bucket within the object doesn't exist, it also creates it. If the // object already exists, it overwrites the object. func (s *Server) CreateObjectStreaming(obj StreamingObject) error { - obj, err := s.createObject(obj) + obj, err := s.createObject(obj, backend.NoConditions{}) if err != nil { return err } @@ -297,7 +297,7 @@ func (s *Server) CreateObjectStreaming(obj StreamingObject) error { return nil } -func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) { +func (s *Server) createObject(obj StreamingObject, conditions backend.Conditions) (StreamingObject, error) { oldBackendObj, err := s.backend.GetObject(obj.BucketName, obj.Name) // Calling Close before checking err is okay on objects, and the object // may need to be closed whether or not there's an error. @@ -306,7 +306,7 @@ func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) { prevVersionExisted := err == nil // The caller is responsible for closing the created object. - newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0]) + newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0], conditions) if err != nil { return StreamingObject{}, err } @@ -680,7 +680,7 @@ func (s *Server) setObjectACL(r *http.Request) jsonResponse { Role: role, }} - obj, err = s.createObject(obj) + obj, err = s.createObject(obj, backend.NoConditions{}) if err != nil { return errToJsonResponse(err) } @@ -735,7 +735,7 @@ func (s *Server) rewriteObject(r *http.Request) jsonResponse { Content: obj.Content, } - created, err := s.createObject(newObject) + created, err := s.createObject(newObject, backend.NoConditions{}) if err != nil { return errToJsonResponse(err) } diff --git a/fakestorage/upload.go b/fakestorage/upload.go index 61d6570fdb..42b8c3c0fb 100644 --- a/fakestorage/upload.go +++ b/fakestorage/upload.go @@ -19,6 +19,7 @@ import ( "strings" "cloud.google.com/go/storage" + "github.com/fsouza/fake-gcs-server/internal/backend" "github.com/fsouza/fake-gcs-server/internal/checksum" "github.com/gorilla/mux" ) @@ -46,6 +47,21 @@ type contentRange struct { Total int // Total bytes expected, -1 if unknown } +type generationCondition struct { + ifGenerationMatch *int64 + ifGenerationNotMatch *int64 +} + +func (c generationCondition) ConditionsMet(activeGeneration int64) bool { + if c.ifGenerationMatch != nil && *c.ifGenerationMatch != activeGeneration { + return false + } + if c.ifGenerationNotMatch != nil && *c.ifGenerationNotMatch == activeGeneration { + return false + } + return true +} + func (s *Server) insertObject(r *http.Request) jsonResponse { bucketName := unescapeMuxVars(mux.Vars(r))["bucketName"] @@ -136,7 +152,7 @@ func (s *Server) insertFormObject(r *http.Request) xmlResponse { }, Content: infile, } - obj, err = s.createObject(obj) + obj, err = s.createObject(obj, backend.NoConditions{}) if err != nil { return xmlResponse{errorMessage: err.Error()} } @@ -144,32 +160,19 @@ func (s *Server) insertFormObject(r *http.Request) xmlResponse { return xmlResponse{status: http.StatusNoContent} } -func (s *Server) checkUploadPreconditions(r *http.Request, bucketName string, objectName string) *jsonResponse { +func (s *Server) wrapUploadPreconditions(r *http.Request, bucketName string, objectName string) (generationCondition, error) { + result := generationCondition{ + ifGenerationMatch: nil, + ifGenerationNotMatch: nil, + } ifGenerationMatch := r.URL.Query().Get("ifGenerationMatch") if ifGenerationMatch != "" { gen, err := strconv.ParseInt(ifGenerationMatch, 10, 64) if err != nil { - return &jsonResponse{ - status: http.StatusBadRequest, - errorMessage: err.Error(), - } - } - if gen == 0 { - _, err := s.backend.GetObject(bucketName, objectName) - if err == nil { - return &jsonResponse{ - status: http.StatusPreconditionFailed, - errorMessage: "Precondition failed", - } - } - } else if _, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen); err != nil { - return &jsonResponse{ - status: http.StatusPreconditionFailed, - errorMessage: "Precondition failed", - } + return generationCondition{}, err } - return nil + result.ifGenerationMatch = &gen } ifGenerationNotMatch := r.URL.Query().Get("ifGenerationNotMatch") @@ -177,31 +180,12 @@ func (s *Server) checkUploadPreconditions(r *http.Request, bucketName string, ob if ifGenerationNotMatch != "" { gen, err := strconv.ParseInt(ifGenerationNotMatch, 10, 64) if err != nil { - return &jsonResponse{ - status: http.StatusBadRequest, - errorMessage: err.Error(), - } - } - obj, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen) - // Calling Close before checking err is okay on objects, and the return - // path below is complicated. - defer obj.Close() //lint:ignore SA5001 // see above - if gen == 0 { - if err != nil { - return &jsonResponse{ - status: http.StatusPreconditionFailed, - errorMessage: "Precondition failed", - } - } - } else if err == nil { - return &jsonResponse{ - status: http.StatusPreconditionFailed, - errorMessage: "Precondition failed", - } + return generationCondition{}, err } + result.ifGenerationNotMatch = &gen } - return nil + return result, nil } func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse { @@ -225,7 +209,7 @@ func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse { }, Content: notImplementedSeeker{r.Body}, } - obj, err := s.createObject(obj) + obj, err := s.createObject(obj, backend.NoConditions{}) if err != nil { return errToJsonResponse(err) } @@ -271,7 +255,7 @@ func (s *Server) signedUpload(bucketName string, r *http.Request) jsonResponse { }, Content: notImplementedSeeker{r.Body}, } - obj, err := s.createObject(obj) + obj, err := s.createObject(obj, backend.NoConditions{}) if err != nil { return errToJsonResponse(err) } @@ -339,8 +323,12 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons objName = metadata.Name } - if resp := s.checkUploadPreconditions(r, bucketName, objName); resp != nil { - return *resp + conditions, err := s.wrapUploadPreconditions(r, bucketName, objName) + if err != nil { + return jsonResponse{ + status: http.StatusBadRequest, + errorMessage: err.Error(), + } } obj := StreamingObject{ @@ -354,7 +342,8 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons }, Content: notImplementedSeeker{io.NopCloser(io.MultiReader(partReaders...))}, } - obj, err = s.createObject(obj) + + obj, err = s.createObject(obj, conditions) if err != nil { return errToJsonResponse(err) } @@ -490,7 +479,7 @@ func (s *Server) uploadFileContent(r *http.Request) jsonResponse { } if commit { s.uploads.Delete(uploadID) - streamingObject, err := s.createObject(obj.StreamingObject()) + streamingObject, err := s.createObject(obj.StreamingObject(), backend.NoConditions{}) if err != nil { return errToJsonResponse(err) } diff --git a/fakestorage/upload_test.go b/fakestorage/upload_test.go index da68465ea9..4672e8166e 100644 --- a/fakestorage/upload_test.go +++ b/fakestorage/upload_test.go @@ -9,6 +9,7 @@ import ( "compress/gzip" "context" "crypto/tls" + "encoding/binary" "encoding/json" "fmt" "io" @@ -389,6 +390,77 @@ func TestServerClientObjectOperationsFailureToWriteExistingObject(t *testing.T) }) } +func TestServerClientUploadRacesAreOnlyWonByOne(t *testing.T) { + const ( + bucketName = "some-bucket" + repetitions = 10 + parallelism = 5 + ) + + type workerResult struct { + success bool + worker uint16 + } + + runServersTest(t, runServersOptions{enableFSBackend: true}, func(t *testing.T, server *Server) { + bucket := server.Client().Bucket(bucketName) + if err := bucket.Create(context.Background(), "my-project", nil); err != nil { + t.Fatal(err) + } + + // Repeat test to increase chance of detecting race + for i := 0; i < repetitions; i++ { + objHandle := bucket.Object(fmt.Sprintf("object-%d.bin", i)) + + results := make(chan workerResult) + for j := uint16(0); j < parallelism; j++ { + go func(workerIndex uint16) { + writer := objHandle.If(storage.Conditions{DoesNotExist: true}).NewWriter(context.Background()) + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, workerIndex) + writer.Write(buf) + results <- workerResult{success: writer.Close() == nil, worker: workerIndex} + }(j) + } + + var successes int + var failures int + var winner uint16 + for j := 0; j < parallelism; j++ { + result := <-results + if result.success { + successes++ + winner = result.worker + } else { + failures++ + } + } + + if successes != 1 { + t.Errorf("in attempt %d, expected 1 success but got %d", i, successes) + } + if failures != parallelism-1 { + t.Errorf("in attempt %d, expected %d failures but got %d", i, parallelism-1, failures) + } + reader, err := objHandle.NewReader(context.Background()) + if err != nil { + t.Errorf("in attempt %d, readback failed with %#v", i, err) + } + buf := make([]byte, 2) + l, err := reader.Read(buf) + if err != nil { + t.Errorf("in attempt %d, readback.read failed with %#v", i, err) + } + if l != 2 { + t.Errorf("in attempt %d, insufficient bytes read", i) + } + if winner != binary.BigEndian.Uint16(buf) { + t.Errorf("in attempt %d, %d were told as winner, but %d actually stored", i, winner, binary.BigEndian.Uint16(buf)) + } + } + }) +} + func TestServerClientObjectWriterBucketNotFound(t *testing.T) { runServersTest(t, runServersOptions{}, func(t *testing.T, server *Server) { client := server.Client() diff --git a/internal/backend/backend_test.go b/internal/backend/backend_test.go index df101720a0..a27f39ac35 100644 --- a/internal/backend/backend_test.go +++ b/internal/backend/backend_test.go @@ -76,12 +76,12 @@ func shouldError(t *testing.T, err error) { func uploadAndCompare(t *testing.T, storage Storage, obj Object) int64 { isFSStorage := reflect.TypeOf(storage) == reflect.TypeOf(&storageFS{}) - newObject, err := storage.CreateObject(obj.StreamingObject()) + newObject, err := storage.CreateObject(obj.StreamingObject(), NoConditions{}) if isFSStorage && obj.Generation != 0 { t.Log("FS should not support objects generation") shouldError(t, err) obj.Generation = 0 - newObject, err = storage.CreateObject(obj.StreamingObject()) + newObject, err = storage.CreateObject(obj.StreamingObject(), NoConditions{}) } noError(t, err) newObject.Close() @@ -224,7 +224,7 @@ func TestObjectQueryErrors(t *testing.T) { }, Content: []byte("random-content"), } - obj, err := storage.CreateObject(validObject.StreamingObject()) + obj, err := storage.CreateObject(validObject.StreamingObject(), NoConditions{}) noError(t, err) obj.Close() _, err = storage.GetObjectWithGeneration(validObject.BucketName, validObject.Name, 33333) diff --git a/internal/backend/fs.go b/internal/backend/fs.go index 8a4736b9a3..4a3bac21f4 100644 --- a/internal/backend/fs.go +++ b/internal/backend/fs.go @@ -62,7 +62,7 @@ func NewStorageFS(objects []StreamingObject, rootDir string) (Storage, error) { s := &storageFS{rootDir: rootDir, mh: mh} for _, o := range objects { - obj, err := s.CreateObject(o) + obj, err := s.CreateObject(o, NoConditions{}) if err != nil { return nil, err } @@ -148,7 +148,7 @@ func (s *storageFS) DeleteBucket(name string) error { // The crc32c checksum and md5 hash of the object content is calculated when // reading the object content. Any checksum or hash in the passed-in object // metadata is overwritten. -func (s *storageFS) CreateObject(obj StreamingObject) (StreamingObject, error) { +func (s *storageFS) CreateObject(obj StreamingObject, conditions Conditions) (StreamingObject, error) { if obj.Generation > 0 { return StreamingObject{}, errors.New("not implemented: fs storage type does not support objects generation yet") } @@ -165,6 +165,18 @@ func (s *storageFS) CreateObject(obj StreamingObject) (StreamingObject, error) { return StreamingObject{}, err } + var activeGeneration int64 + existingObj, err := s.getObject(obj.BucketName, obj.Name) + if err != nil { + activeGeneration = 0 + } else { + activeGeneration = existingObj.Generation + } + + if !conditions.ConditionsMet(activeGeneration) { + return StreamingObject{}, PreConditionFailed + } + path := filepath.Join(s.rootDir, url.PathEscape(obj.BucketName), url.PathEscape(obj.Name)) tempFile, err := os.CreateTemp(filepath.Dir(path), "fake-gcs-object") @@ -342,8 +354,8 @@ func (s *storageFS) PatchObject(bucketName, objectName string, metadata map[stri for k, v := range metadata { obj.Metadata[k] = v } - obj.Generation = 0 // reset generation id - return s.CreateObject(obj) // recreate object + obj.Generation = 0 // reset generation id + return s.CreateObject(obj, NoConditions{}) // recreate object } // UpdateObject replaces the given object metadata. @@ -357,8 +369,8 @@ func (s *storageFS) UpdateObject(bucketName, objectName string, metadata map[str for k, v := range metadata { obj.Metadata[k] = v } - obj.Generation = 0 // reset generation id - return s.CreateObject(obj) // recreate object + obj.Generation = 0 // reset generation id + return s.CreateObject(obj, NoConditions{}) // recreate object } type concatenatedContent struct { @@ -404,7 +416,7 @@ func (s *storageFS) ComposeObject(bucketName string, objectNames []string, desti dest.Content = concatObjectReaders(sourceObjects) dest.Metadata = metadata - result, err := s.CreateObject(dest) + result, err := s.CreateObject(dest, NoConditions{}) if err != nil { return result, err } diff --git a/internal/backend/memory.go b/internal/backend/memory.go index 3ba4cc79ee..80e8bfee90 100644 --- a/internal/backend/memory.go +++ b/internal/backend/memory.go @@ -113,6 +113,18 @@ func findObject(obj Object, objectList []Object, matchGeneration bool) int { return -1 } +// findLastObjectGeneration looks for an object in the given list and return the index where it +// was found, or -1 if the object doesn't exist. +func findLastObjectGeneration(obj Object, objectList []Object) int64 { + highScore := int64(0) + for _, o := range objectList { + if obj.IDNoGen() == o.IDNoGen() && o.Generation > highScore { + highScore = o.Generation + } + } + return highScore +} + // NewStorageMemory creates an instance of StorageMemory. func NewStorageMemory(objects []StreamingObject) (Storage, error) { s := &storageMemory{ @@ -189,7 +201,7 @@ func (s *storageMemory) DeleteBucket(name string) error { } // CreateObject stores an object in the backend. -func (s *storageMemory) CreateObject(obj StreamingObject) (StreamingObject, error) { +func (s *storageMemory) CreateObject(obj StreamingObject, conditions Conditions) (StreamingObject, error) { s.mtx.Lock() defer s.mtx.Unlock() bucketInMemory, err := s.getBucketInMemory(obj.BucketName) @@ -197,6 +209,10 @@ func (s *storageMemory) CreateObject(obj StreamingObject) (StreamingObject, erro bucketInMemory = newBucketInMemory(obj.BucketName, false) } bufferedObj, err := obj.BufferedObject() + currentGeneration := findLastObjectGeneration(bufferedObj, bucketInMemory.activeObjects) + if !conditions.ConditionsMet(currentGeneration) { + return StreamingObject{}, PreConditionFailed + } if err != nil { return StreamingObject{}, err } @@ -295,7 +311,7 @@ func (s *storageMemory) PatchObject(bucketName, objectName string, metadata map[ for k, v := range metadata { obj.Metadata[k] = v } - s.CreateObject(obj) // recreate object + s.CreateObject(obj, NoConditions{}) // recreate object return obj, nil } @@ -309,7 +325,7 @@ func (s *storageMemory) UpdateObject(bucketName, objectName string, metadata map for k, v := range metadata { obj.Metadata[k] = v } - s.CreateObject(obj) // recreate object + s.CreateObject(obj, NoConditions{}) // recreate object return obj, nil } @@ -351,7 +367,7 @@ func (s *storageMemory) ComposeObject(bucketName string, objectNames []string, d dest.Etag = fmt.Sprintf("%q", dest.Md5Hash) dest.Metadata = metadata - result, err := s.CreateObject(dest.StreamingObject()) + result, err := s.CreateObject(dest.StreamingObject(), NoConditions{}) if err != nil { return result, err } diff --git a/internal/backend/storage.go b/internal/backend/storage.go index f85d2d4ac5..596b9cceae 100644 --- a/internal/backend/storage.go +++ b/internal/backend/storage.go @@ -5,6 +5,16 @@ // Package backend proides the backends used by fake-gcs-server. package backend +type Conditions interface { + ConditionsMet(activeGeneration int64) bool +} + +type NoConditions struct{} + +func (NoConditions) ConditionsMet(int64) bool { + return true +} + // Storage is the generic interface for implementing the backend storage of the // server. type Storage interface { @@ -12,7 +22,7 @@ type Storage interface { ListBuckets() ([]Bucket, error) GetBucket(name string) (Bucket, error) DeleteBucket(name string) error - CreateObject(obj StreamingObject) (StreamingObject, error) + CreateObject(obj StreamingObject, conditions Conditions) (StreamingObject, error) ListObjects(bucketName string, prefix string, versions bool) ([]ObjectAttrs, error) GetObject(bucketName, objectName string) (StreamingObject, error) GetObjectWithGeneration(bucketName, objectName string, generation int64) (StreamingObject, error) @@ -27,6 +37,7 @@ type Error string func (e Error) Error() string { return string(e) } const ( - BucketNotFound = Error("bucket not found") - BucketNotEmpty = Error("bucket must be empty prior to deletion") + BucketNotFound = Error("bucket not found") + BucketNotEmpty = Error("bucket must be empty prior to deletion") + PreConditionFailed = Error("Precondition failed") )