Skip to content

Commit

Permalink
Race-free Preconditions (#935)
Browse files Browse the repository at this point in the history
* upload_test: Add UploadRacesAreOnlyWonByOne

This tests for race-conditions in Conditional uploads, by repeatedly
uploading to the same object from multiple go-routines, and verify that
only one succeeded.

* backend/storage:Add `Conditions` to CreateObject

Move the "Preconditions"-checking into the backend, to be done under the
protection of the backend mutex. This enables `DoesNotExist`-behavior to be
implemented race-free

* upload_test: improve diagostics of ..RacesAreOnlyWonByOne

* Apply suggestions from review

Co-authored-by: fsouza <[email protected]>

* upload_test:Use runServersTest in ...RacesAreOnlyWonByOne

While potentially a problem for test-performance, speed of running tests
are currently bearable on my machine

ok  	github.com/fsouza/fake-gcs-server	0.016s
ok  	github.com/fsouza/fake-gcs-server/fakestorage	1.176s
ok  	github.com/fsouza/fake-gcs-server/internal/backend	0.031s
ok  	github.com/fsouza/fake-gcs-server/internal/checksum	0.010s
ok  	github.com/fsouza/fake-gcs-server/internal/config	0.010s
ok  	github.com/fsouza/fake-gcs-server/internal/notification	0.026s

Co-authored-by: fsouza <[email protected]>
  • Loading branch information
rawler and fsouza authored Oct 31, 2022
1 parent 66fa719 commit 4e828df
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 71 deletions.
5 changes: 5 additions & 0 deletions fakestorage/json_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"os"
"syscall"

"github.com/fsouza/fake-gcs-server/internal/backend"
)

type jsonResponse struct {
Expand Down Expand Up @@ -63,5 +65,8 @@ func errToJsonResponse(err error) jsonResponse {
if errors.As(err, &pathError) && pathError.Err == syscall.ENAMETOOLONG {
status = http.StatusBadRequest
}
if err == backend.PreConditionFailed {
status = http.StatusPreconditionFailed
}
return jsonResponse{errorMessage: err.Error(), status: status}
}
10 changes: 5 additions & 5 deletions fakestorage/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ func (s *Server) CreateObject(obj Object) {
// If the bucket within the object doesn't exist, it also creates it. If the
// object already exists, it overwrites the object.
func (s *Server) CreateObjectStreaming(obj StreamingObject) error {
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return err
}
obj.Close()
return nil
}

func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) {
func (s *Server) createObject(obj StreamingObject, conditions backend.Conditions) (StreamingObject, error) {
oldBackendObj, err := s.backend.GetObject(obj.BucketName, obj.Name)
// Calling Close before checking err is okay on objects, and the object
// may need to be closed whether or not there's an error.
Expand All @@ -306,7 +306,7 @@ func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) {
prevVersionExisted := err == nil

// The caller is responsible for closing the created object.
newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0])
newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0], conditions)
if err != nil {
return StreamingObject{}, err
}
Expand Down Expand Up @@ -680,7 +680,7 @@ func (s *Server) setObjectACL(r *http.Request) jsonResponse {
Role: role,
}}

obj, err = s.createObject(obj)
obj, err = s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -735,7 +735,7 @@ func (s *Server) rewriteObject(r *http.Request) jsonResponse {
Content: obj.Content,
}

created, err := s.createObject(newObject)
created, err := s.createObject(newObject, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down
87 changes: 38 additions & 49 deletions fakestorage/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"cloud.google.com/go/storage"
"github.com/fsouza/fake-gcs-server/internal/backend"
"github.com/fsouza/fake-gcs-server/internal/checksum"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -46,6 +47,21 @@ type contentRange struct {
Total int // Total bytes expected, -1 if unknown
}

type generationCondition struct {
ifGenerationMatch *int64
ifGenerationNotMatch *int64
}

func (c generationCondition) ConditionsMet(activeGeneration int64) bool {
if c.ifGenerationMatch != nil && *c.ifGenerationMatch != activeGeneration {
return false
}
if c.ifGenerationNotMatch != nil && *c.ifGenerationNotMatch == activeGeneration {
return false
}
return true
}

func (s *Server) insertObject(r *http.Request) jsonResponse {
bucketName := unescapeMuxVars(mux.Vars(r))["bucketName"]

Expand Down Expand Up @@ -136,72 +152,40 @@ func (s *Server) insertFormObject(r *http.Request) xmlResponse {
},
Content: infile,
}
obj, err = s.createObject(obj)
obj, err = s.createObject(obj, backend.NoConditions{})
if err != nil {
return xmlResponse{errorMessage: err.Error()}
}
defer obj.Close()
return xmlResponse{status: http.StatusNoContent}
}

func (s *Server) checkUploadPreconditions(r *http.Request, bucketName string, objectName string) *jsonResponse {
func (s *Server) wrapUploadPreconditions(r *http.Request, bucketName string, objectName string) (generationCondition, error) {
result := generationCondition{
ifGenerationMatch: nil,
ifGenerationNotMatch: nil,
}
ifGenerationMatch := r.URL.Query().Get("ifGenerationMatch")

if ifGenerationMatch != "" {
gen, err := strconv.ParseInt(ifGenerationMatch, 10, 64)
if err != nil {
return &jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}
if gen == 0 {
_, err := s.backend.GetObject(bucketName, objectName)
if err == nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
}
} else if _, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen); err != nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
return generationCondition{}, err
}
return nil
result.ifGenerationMatch = &gen
}

ifGenerationNotMatch := r.URL.Query().Get("ifGenerationNotMatch")

if ifGenerationNotMatch != "" {
gen, err := strconv.ParseInt(ifGenerationNotMatch, 10, 64)
if err != nil {
return &jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}
obj, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen)
// Calling Close before checking err is okay on objects, and the return
// path below is complicated.
defer obj.Close() //lint:ignore SA5001 // see above
if gen == 0 {
if err != nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
}
} else if err == nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
return generationCondition{}, err
}
result.ifGenerationNotMatch = &gen
}

return nil
return result, nil
}

func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse {
Expand All @@ -225,7 +209,7 @@ func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse {
},
Content: notImplementedSeeker{r.Body},
}
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -271,7 +255,7 @@ func (s *Server) signedUpload(bucketName string, r *http.Request) jsonResponse {
},
Content: notImplementedSeeker{r.Body},
}
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -339,8 +323,12 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons
objName = metadata.Name
}

if resp := s.checkUploadPreconditions(r, bucketName, objName); resp != nil {
return *resp
conditions, err := s.wrapUploadPreconditions(r, bucketName, objName)
if err != nil {
return jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}

obj := StreamingObject{
Expand All @@ -354,7 +342,8 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons
},
Content: notImplementedSeeker{io.NopCloser(io.MultiReader(partReaders...))},
}
obj, err = s.createObject(obj)

obj, err = s.createObject(obj, conditions)
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -490,7 +479,7 @@ func (s *Server) uploadFileContent(r *http.Request) jsonResponse {
}
if commit {
s.uploads.Delete(uploadID)
streamingObject, err := s.createObject(obj.StreamingObject())
streamingObject, err := s.createObject(obj.StreamingObject(), backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down
72 changes: 72 additions & 0 deletions fakestorage/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -389,6 +390,77 @@ func TestServerClientObjectOperationsFailureToWriteExistingObject(t *testing.T)
})
}

func TestServerClientUploadRacesAreOnlyWonByOne(t *testing.T) {
const (
bucketName = "some-bucket"
repetitions = 10
parallelism = 5
)

type workerResult struct {
success bool
worker uint16
}

runServersTest(t, runServersOptions{enableFSBackend: true}, func(t *testing.T, server *Server) {
bucket := server.Client().Bucket(bucketName)
if err := bucket.Create(context.Background(), "my-project", nil); err != nil {
t.Fatal(err)
}

// Repeat test to increase chance of detecting race
for i := 0; i < repetitions; i++ {
objHandle := bucket.Object(fmt.Sprintf("object-%d.bin", i))

results := make(chan workerResult)
for j := uint16(0); j < parallelism; j++ {
go func(workerIndex uint16) {
writer := objHandle.If(storage.Conditions{DoesNotExist: true}).NewWriter(context.Background())
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, workerIndex)
writer.Write(buf)
results <- workerResult{success: writer.Close() == nil, worker: workerIndex}
}(j)
}

var successes int
var failures int
var winner uint16
for j := 0; j < parallelism; j++ {
result := <-results
if result.success {
successes++
winner = result.worker
} else {
failures++
}
}

if successes != 1 {
t.Errorf("in attempt %d, expected 1 success but got %d", i, successes)
}
if failures != parallelism-1 {
t.Errorf("in attempt %d, expected %d failures but got %d", i, parallelism-1, failures)
}
reader, err := objHandle.NewReader(context.Background())
if err != nil {
t.Errorf("in attempt %d, readback failed with %#v", i, err)
}
buf := make([]byte, 2)
l, err := reader.Read(buf)
if err != nil {
t.Errorf("in attempt %d, readback.read failed with %#v", i, err)
}
if l != 2 {
t.Errorf("in attempt %d, insufficient bytes read", i)
}
if winner != binary.BigEndian.Uint16(buf) {
t.Errorf("in attempt %d, %d were told as winner, but %d actually stored", i, winner, binary.BigEndian.Uint16(buf))
}
}
})
}

func TestServerClientObjectWriterBucketNotFound(t *testing.T) {
runServersTest(t, runServersOptions{}, func(t *testing.T, server *Server) {
client := server.Client()
Expand Down
6 changes: 3 additions & 3 deletions internal/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func shouldError(t *testing.T, err error) {

func uploadAndCompare(t *testing.T, storage Storage, obj Object) int64 {
isFSStorage := reflect.TypeOf(storage) == reflect.TypeOf(&storageFS{})
newObject, err := storage.CreateObject(obj.StreamingObject())
newObject, err := storage.CreateObject(obj.StreamingObject(), NoConditions{})
if isFSStorage && obj.Generation != 0 {
t.Log("FS should not support objects generation")
shouldError(t, err)
obj.Generation = 0
newObject, err = storage.CreateObject(obj.StreamingObject())
newObject, err = storage.CreateObject(obj.StreamingObject(), NoConditions{})
}
noError(t, err)
newObject.Close()
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestObjectQueryErrors(t *testing.T) {
},
Content: []byte("random-content"),
}
obj, err := storage.CreateObject(validObject.StreamingObject())
obj, err := storage.CreateObject(validObject.StreamingObject(), NoConditions{})
noError(t, err)
obj.Close()
_, err = storage.GetObjectWithGeneration(validObject.BucketName, validObject.Name, 33333)
Expand Down
Loading

0 comments on commit 4e828df

Please sign in to comment.