Skip to content

Commit

Permalink
backend/storage:Add Conditions to CreateObject
Browse files Browse the repository at this point in the history
Move the "Preconditions"-checking into the backend, to be done under the
protection of the backend mutex. This enables `DoesNotExist`-behavior to be
implemented race-free
  • Loading branch information
rawler committed Sep 29, 2022
1 parent d615b8d commit 658d71e
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 71 deletions.
6 changes: 6 additions & 0 deletions fakestorage/json_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"os"
"syscall"

"github.com/fsouza/fake-gcs-server/internal/backend"
)

type jsonResponse struct {
Expand Down Expand Up @@ -63,5 +65,9 @@ func errToJsonResponse(err error) jsonResponse {
if errors.As(err, &pathError) && pathError.Err == syscall.ENAMETOOLONG {
status = http.StatusBadRequest
}
var backendError backend.Error
if errors.As(err, &backendError) && backendError == backend.PreConditionFailed {
status = http.StatusPreconditionFailed
}
return jsonResponse{errorMessage: err.Error(), status: status}
}
10 changes: 5 additions & 5 deletions fakestorage/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ func (s *Server) CreateObject(obj Object) {
// If the bucket within the object doesn't exist, it also creates it. If the
// object already exists, it overwrites the object.
func (s *Server) CreateObjectStreaming(obj StreamingObject) error {
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return err
}
obj.Close()
return nil
}

func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) {
func (s *Server) createObject(obj StreamingObject, conditions backend.Conditions) (StreamingObject, error) {
oldBackendObj, err := s.backend.GetObject(obj.BucketName, obj.Name)
// Calling Close before checking err is okay on objects, and the object
// may need to be closed whether or not there's an error.
Expand All @@ -306,7 +306,7 @@ func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) {
prevVersionExisted := err == nil

// The caller is responsible for closing the created object.
newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0])
newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0], conditions)
if err != nil {
return StreamingObject{}, err
}
Expand Down Expand Up @@ -680,7 +680,7 @@ func (s *Server) setObjectACL(r *http.Request) jsonResponse {
Role: role,
}}

obj, err = s.createObject(obj)
obj, err = s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -735,7 +735,7 @@ func (s *Server) rewriteObject(r *http.Request) jsonResponse {
Content: obj.Content,
}

created, err := s.createObject(newObject)
created, err := s.createObject(newObject, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down
87 changes: 38 additions & 49 deletions fakestorage/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"cloud.google.com/go/storage"
"github.com/fsouza/fake-gcs-server/internal/backend"
"github.com/fsouza/fake-gcs-server/internal/checksum"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -46,6 +47,21 @@ type contentRange struct {
Total int // Total bytes expected, -1 if unknown
}

type generationCondition struct {
ifGenerationMatch *int64
ifGenerationNotMatch *int64
}

func (c generationCondition) ConditionsMet(activeGeneration int64) bool {
if c.ifGenerationMatch != nil && *c.ifGenerationMatch != activeGeneration {
return false
}
if c.ifGenerationNotMatch != nil && *c.ifGenerationNotMatch == activeGeneration {
return false
}
return true
}

func (s *Server) insertObject(r *http.Request) jsonResponse {
bucketName := unescapeMuxVars(mux.Vars(r))["bucketName"]

Expand Down Expand Up @@ -136,72 +152,40 @@ func (s *Server) insertFormObject(r *http.Request) xmlResponse {
},
Content: infile,
}
obj, err = s.createObject(obj)
obj, err = s.createObject(obj, backend.NoConditions{})
if err != nil {
return xmlResponse{errorMessage: err.Error()}
}
defer obj.Close()
return xmlResponse{status: http.StatusNoContent}
}

func (s *Server) checkUploadPreconditions(r *http.Request, bucketName string, objectName string) *jsonResponse {
func (s *Server) wrapUploadPreconditions(r *http.Request, bucketName string, objectName string) (*generationCondition, error) {
result := &generationCondition{
ifGenerationMatch: nil,
ifGenerationNotMatch: nil,
}
ifGenerationMatch := r.URL.Query().Get("ifGenerationMatch")

if ifGenerationMatch != "" {
gen, err := strconv.ParseInt(ifGenerationMatch, 10, 64)
if err != nil {
return &jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}
if gen == 0 {
_, err := s.backend.GetObject(bucketName, objectName)
if err == nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
}
} else if _, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen); err != nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
return nil, err
}
return nil
result.ifGenerationMatch = &gen
}

ifGenerationNotMatch := r.URL.Query().Get("ifGenerationNotMatch")

if ifGenerationNotMatch != "" {
gen, err := strconv.ParseInt(ifGenerationNotMatch, 10, 64)
if err != nil {
return &jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}
obj, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen)
// Calling Close before checking err is okay on objects, and the return
// path below is complicated.
defer obj.Close() //lint:ignore SA5001 // see above
if gen == 0 {
if err != nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
}
} else if err == nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
return nil, err
}
result.ifGenerationNotMatch = &gen
}

return nil
return result, nil
}

func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse {
Expand All @@ -225,7 +209,7 @@ func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse {
},
Content: notImplementedSeeker{r.Body},
}
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -271,7 +255,7 @@ func (s *Server) signedUpload(bucketName string, r *http.Request) jsonResponse {
},
Content: notImplementedSeeker{r.Body},
}
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -339,8 +323,12 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons
objName = metadata.Name
}

if resp := s.checkUploadPreconditions(r, bucketName, objName); resp != nil {
return *resp
conditions, err := s.wrapUploadPreconditions(r, bucketName, objName)
if err != nil {
return jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}

obj := StreamingObject{
Expand All @@ -354,7 +342,8 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons
},
Content: notImplementedSeeker{io.NopCloser(io.MultiReader(partReaders...))},
}
obj, err = s.createObject(obj)

obj, err = s.createObject(obj, *conditions)
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -490,7 +479,7 @@ func (s *Server) uploadFileContent(r *http.Request) jsonResponse {
}
if commit {
s.uploads.Delete(uploadID)
streamingObject, err := s.createObject(obj.StreamingObject())
streamingObject, err := s.createObject(obj.StreamingObject(), backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func shouldError(t *testing.T, err error) {

func uploadAndCompare(t *testing.T, storage Storage, obj Object) int64 {
isFSStorage := reflect.TypeOf(storage) == reflect.TypeOf(&storageFS{})
newObject, err := storage.CreateObject(obj.StreamingObject())
newObject, err := storage.CreateObject(obj.StreamingObject(), NoConditions{})
if isFSStorage && obj.Generation != 0 {
t.Log("FS should not support objects generation")
shouldError(t, err)
obj.Generation = 0
newObject, err = storage.CreateObject(obj.StreamingObject())
newObject, err = storage.CreateObject(obj.StreamingObject(), NoConditions{})
}
noError(t, err)
newObject.Close()
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestObjectQueryErrors(t *testing.T) {
},
Content: []byte("random-content"),
}
obj, err := storage.CreateObject(validObject.StreamingObject())
obj, err := storage.CreateObject(validObject.StreamingObject(), NoConditions{})
noError(t, err)
obj.Close()
_, err = storage.GetObjectWithGeneration(validObject.BucketName, validObject.Name, 33333)
Expand Down
26 changes: 19 additions & 7 deletions internal/backend/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewStorageFS(objects []StreamingObject, rootDir string) (Storage, error) {

s := &storageFS{rootDir: rootDir, mh: mh}
for _, o := range objects {
obj, err := s.CreateObject(o)
obj, err := s.CreateObject(o, NoConditions{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *storageFS) DeleteBucket(name string) error {
// The crc32c checksum and md5 hash of the object content is calculated when
// reading the object content. Any checksum or hash in the passed-in object
// metadata is overwritten.
func (s *storageFS) CreateObject(obj StreamingObject) (StreamingObject, error) {
func (s *storageFS) CreateObject(obj StreamingObject, conditions Conditions) (StreamingObject, error) {
if obj.Generation > 0 {
return StreamingObject{}, errors.New("not implemented: fs storage type does not support objects generation yet")
}
Expand All @@ -165,6 +165,18 @@ func (s *storageFS) CreateObject(obj StreamingObject) (StreamingObject, error) {
return StreamingObject{}, err
}

var activeGeneration int64
existingObj, err := s.getObject(obj.BucketName, obj.Name)
if err != nil {
activeGeneration = 0
} else {
activeGeneration = existingObj.Generation
}

if !conditions.ConditionsMet(activeGeneration) {
return StreamingObject{}, PreConditionFailed
}

path := filepath.Join(s.rootDir, url.PathEscape(obj.BucketName), url.PathEscape(obj.Name))

tempFile, err := os.CreateTemp(filepath.Dir(path), "fake-gcs-object")
Expand Down Expand Up @@ -342,8 +354,8 @@ func (s *storageFS) PatchObject(bucketName, objectName string, metadata map[stri
for k, v := range metadata {
obj.Metadata[k] = v
}
obj.Generation = 0 // reset generation id
return s.CreateObject(obj) // recreate object
obj.Generation = 0 // reset generation id
return s.CreateObject(obj, NoConditions{}) // recreate object
}

// UpdateObject replaces the given object metadata.
Expand All @@ -357,8 +369,8 @@ func (s *storageFS) UpdateObject(bucketName, objectName string, metadata map[str
for k, v := range metadata {
obj.Metadata[k] = v
}
obj.Generation = 0 // reset generation id
return s.CreateObject(obj) // recreate object
obj.Generation = 0 // reset generation id
return s.CreateObject(obj, NoConditions{}) // recreate object
}

type concatenatedContent struct {
Expand Down Expand Up @@ -404,7 +416,7 @@ func (s *storageFS) ComposeObject(bucketName string, objectNames []string, desti
dest.Content = concatObjectReaders(sourceObjects)
dest.Metadata = metadata

result, err := s.CreateObject(dest)
result, err := s.CreateObject(dest, NoConditions{})
if err != nil {
return result, err
}
Expand Down
24 changes: 20 additions & 4 deletions internal/backend/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ func findObject(obj Object, objectList []Object, matchGeneration bool) int {
return -1
}

// findObject looks for an object in the given list and return the index where it
// was found, or -1 if the object doesn't exist.
func findLastObjectGeneration(obj Object, objectList []Object) int64 {
highScore := int64(0)
for _, o := range objectList {
if obj.IDNoGen() == o.IDNoGen() && o.Generation > highScore {
highScore = o.Generation
}
}
return highScore
}

// NewStorageMemory creates an instance of StorageMemory.
func NewStorageMemory(objects []StreamingObject) (Storage, error) {
s := &storageMemory{
Expand Down Expand Up @@ -189,14 +201,18 @@ func (s *storageMemory) DeleteBucket(name string) error {
}

// CreateObject stores an object in the backend.
func (s *storageMemory) CreateObject(obj StreamingObject) (StreamingObject, error) {
func (s *storageMemory) CreateObject(obj StreamingObject, conditions Conditions) (StreamingObject, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
bucketInMemory, err := s.getBucketInMemory(obj.BucketName)
if err != nil {
bucketInMemory = newBucketInMemory(obj.BucketName, false)
}
bufferedObj, err := obj.BufferedObject()
currentGeneration := findLastObjectGeneration(bufferedObj, bucketInMemory.activeObjects)
if !conditions.ConditionsMet(currentGeneration) {
return StreamingObject{}, PreConditionFailed
}
if err != nil {
return StreamingObject{}, err
}
Expand Down Expand Up @@ -295,7 +311,7 @@ func (s *storageMemory) PatchObject(bucketName, objectName string, metadata map[
for k, v := range metadata {
obj.Metadata[k] = v
}
s.CreateObject(obj) // recreate object
s.CreateObject(obj, NoConditions{}) // recreate object
return obj, nil
}

Expand All @@ -309,7 +325,7 @@ func (s *storageMemory) UpdateObject(bucketName, objectName string, metadata map
for k, v := range metadata {
obj.Metadata[k] = v
}
s.CreateObject(obj) // recreate object
s.CreateObject(obj, NoConditions{}) // recreate object
return obj, nil
}

Expand Down Expand Up @@ -351,7 +367,7 @@ func (s *storageMemory) ComposeObject(bucketName string, objectNames []string, d
dest.Etag = fmt.Sprintf("%q", dest.Md5Hash)
dest.Metadata = metadata

result, err := s.CreateObject(dest.StreamingObject())
result, err := s.CreateObject(dest.StreamingObject(), NoConditions{})
if err != nil {
return result, err
}
Expand Down
Loading

0 comments on commit 658d71e

Please sign in to comment.