Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race-free Preconditions #935

Merged
merged 5 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions fakestorage/json_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"os"
"syscall"

"github.com/fsouza/fake-gcs-server/internal/backend"
)

type jsonResponse struct {
Expand Down Expand Up @@ -63,5 +65,8 @@ func errToJsonResponse(err error) jsonResponse {
if errors.As(err, &pathError) && pathError.Err == syscall.ENAMETOOLONG {
status = http.StatusBadRequest
}
if err == backend.PreConditionFailed {
status = http.StatusPreconditionFailed
}
return jsonResponse{errorMessage: err.Error(), status: status}
}
10 changes: 5 additions & 5 deletions fakestorage/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ func (s *Server) CreateObject(obj Object) {
// If the bucket within the object doesn't exist, it also creates it. If the
// object already exists, it overwrites the object.
func (s *Server) CreateObjectStreaming(obj StreamingObject) error {
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return err
}
obj.Close()
return nil
}

func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) {
func (s *Server) createObject(obj StreamingObject, conditions backend.Conditions) (StreamingObject, error) {
oldBackendObj, err := s.backend.GetObject(obj.BucketName, obj.Name)
// Calling Close before checking err is okay on objects, and the object
// may need to be closed whether or not there's an error.
Expand All @@ -306,7 +306,7 @@ func (s *Server) createObject(obj StreamingObject) (StreamingObject, error) {
prevVersionExisted := err == nil

// The caller is responsible for closing the created object.
newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0])
newBackendObj, err := s.backend.CreateObject(toBackendObjects([]StreamingObject{obj})[0], conditions)
if err != nil {
return StreamingObject{}, err
}
Expand Down Expand Up @@ -680,7 +680,7 @@ func (s *Server) setObjectACL(r *http.Request) jsonResponse {
Role: role,
}}

obj, err = s.createObject(obj)
obj, err = s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -735,7 +735,7 @@ func (s *Server) rewriteObject(r *http.Request) jsonResponse {
Content: obj.Content,
}

created, err := s.createObject(newObject)
created, err := s.createObject(newObject, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down
87 changes: 38 additions & 49 deletions fakestorage/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"cloud.google.com/go/storage"
"github.com/fsouza/fake-gcs-server/internal/backend"
"github.com/fsouza/fake-gcs-server/internal/checksum"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -46,6 +47,21 @@ type contentRange struct {
Total int // Total bytes expected, -1 if unknown
}

type generationCondition struct {
ifGenerationMatch *int64
ifGenerationNotMatch *int64
}

func (c generationCondition) ConditionsMet(activeGeneration int64) bool {
if c.ifGenerationMatch != nil && *c.ifGenerationMatch != activeGeneration {
return false
}
if c.ifGenerationNotMatch != nil && *c.ifGenerationNotMatch == activeGeneration {
return false
}
return true
}

func (s *Server) insertObject(r *http.Request) jsonResponse {
bucketName := unescapeMuxVars(mux.Vars(r))["bucketName"]

Expand Down Expand Up @@ -136,72 +152,40 @@ func (s *Server) insertFormObject(r *http.Request) xmlResponse {
},
Content: infile,
}
obj, err = s.createObject(obj)
obj, err = s.createObject(obj, backend.NoConditions{})
if err != nil {
return xmlResponse{errorMessage: err.Error()}
}
defer obj.Close()
return xmlResponse{status: http.StatusNoContent}
}

func (s *Server) checkUploadPreconditions(r *http.Request, bucketName string, objectName string) *jsonResponse {
func (s *Server) wrapUploadPreconditions(r *http.Request, bucketName string, objectName string) (generationCondition, error) {
result := generationCondition{
ifGenerationMatch: nil,
ifGenerationNotMatch: nil,
}
ifGenerationMatch := r.URL.Query().Get("ifGenerationMatch")

if ifGenerationMatch != "" {
gen, err := strconv.ParseInt(ifGenerationMatch, 10, 64)
if err != nil {
return &jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}
if gen == 0 {
_, err := s.backend.GetObject(bucketName, objectName)
if err == nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
}
} else if _, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen); err != nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
return generationCondition{}, err
}
return nil
result.ifGenerationMatch = &gen
}

ifGenerationNotMatch := r.URL.Query().Get("ifGenerationNotMatch")

if ifGenerationNotMatch != "" {
gen, err := strconv.ParseInt(ifGenerationNotMatch, 10, 64)
if err != nil {
return &jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}
obj, err := s.backend.GetObjectWithGeneration(bucketName, objectName, gen)
// Calling Close before checking err is okay on objects, and the return
// path below is complicated.
defer obj.Close() //lint:ignore SA5001 // see above
if gen == 0 {
if err != nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
}
} else if err == nil {
return &jsonResponse{
status: http.StatusPreconditionFailed,
errorMessage: "Precondition failed",
}
return generationCondition{}, err
}
result.ifGenerationNotMatch = &gen
}

return nil
return result, nil
}

func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse {
Expand All @@ -225,7 +209,7 @@ func (s *Server) simpleUpload(bucketName string, r *http.Request) jsonResponse {
},
Content: notImplementedSeeker{r.Body},
}
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -271,7 +255,7 @@ func (s *Server) signedUpload(bucketName string, r *http.Request) jsonResponse {
},
Content: notImplementedSeeker{r.Body},
}
obj, err := s.createObject(obj)
obj, err := s.createObject(obj, backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -339,8 +323,12 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons
objName = metadata.Name
}

if resp := s.checkUploadPreconditions(r, bucketName, objName); resp != nil {
return *resp
conditions, err := s.wrapUploadPreconditions(r, bucketName, objName)
if err != nil {
return jsonResponse{
status: http.StatusBadRequest,
errorMessage: err.Error(),
}
}

obj := StreamingObject{
Expand All @@ -354,7 +342,8 @@ func (s *Server) multipartUpload(bucketName string, r *http.Request) jsonRespons
},
Content: notImplementedSeeker{io.NopCloser(io.MultiReader(partReaders...))},
}
obj, err = s.createObject(obj)

obj, err = s.createObject(obj, conditions)
if err != nil {
return errToJsonResponse(err)
}
Expand Down Expand Up @@ -490,7 +479,7 @@ func (s *Server) uploadFileContent(r *http.Request) jsonResponse {
}
if commit {
s.uploads.Delete(uploadID)
streamingObject, err := s.createObject(obj.StreamingObject())
streamingObject, err := s.createObject(obj.StreamingObject(), backend.NoConditions{})
if err != nil {
return errToJsonResponse(err)
}
Expand Down
72 changes: 72 additions & 0 deletions fakestorage/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -389,6 +390,77 @@ func TestServerClientObjectOperationsFailureToWriteExistingObject(t *testing.T)
})
}

func TestServerClientUploadRacesAreOnlyWonByOne(t *testing.T) {
const (
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use runServersTest to make sure we're testing both backends?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I worried a little bit about test-performance, given that this test is trying to provoke a race, therefor running multiple iterations. However, I think perhaps we could tune down repetitions slightly to compensate. Attaching test-durations from my machine below.

Also pushing a small improvement in test-diagnostics. On conflict, the test will now describe the conflict in more detail.

ok github.com/fsouza/fake-gcs-server 0.016s
ok github.com/fsouza/fake-gcs-server/fakestorage 1.176s
ok github.com/fsouza/fake-gcs-server/internal/backend 0.031s
ok github.com/fsouza/fake-gcs-server/internal/checksum 0.010s
ok github.com/fsouza/fake-gcs-server/internal/config 0.010s
ok github.com/fsouza/fake-gcs-server/internal/notification 0.026s

bucketName = "some-bucket"
repetitions = 10
parallelism = 5
)

type workerResult struct {
success bool
worker uint16
}

runServersTest(t, runServersOptions{enableFSBackend: true}, func(t *testing.T, server *Server) {
bucket := server.Client().Bucket(bucketName)
if err := bucket.Create(context.Background(), "my-project", nil); err != nil {
t.Fatal(err)
}

// Repeat test to increase chance of detecting race
for i := 0; i < repetitions; i++ {
objHandle := bucket.Object(fmt.Sprintf("object-%d.bin", i))

results := make(chan workerResult)
for j := uint16(0); j < parallelism; j++ {
go func(workerIndex uint16) {
writer := objHandle.If(storage.Conditions{DoesNotExist: true}).NewWriter(context.Background())
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, workerIndex)
writer.Write(buf)
results <- workerResult{success: writer.Close() == nil, worker: workerIndex}
}(j)
}

var successes int
var failures int
var winner uint16
for j := 0; j < parallelism; j++ {
result := <-results
if result.success {
successes++
winner = result.worker
} else {
failures++
}
}

if successes != 1 {
t.Errorf("in attempt %d, expected 1 success but got %d", i, successes)
}
if failures != parallelism-1 {
t.Errorf("in attempt %d, expected %d failures but got %d", i, parallelism-1, failures)
}
reader, err := objHandle.NewReader(context.Background())
if err != nil {
t.Errorf("in attempt %d, readback failed with %#v", i, err)
}
buf := make([]byte, 2)
l, err := reader.Read(buf)
if err != nil {
t.Errorf("in attempt %d, readback.read failed with %#v", i, err)
}
if l != 2 {
t.Errorf("in attempt %d, insufficient bytes read", i)
}
if winner != binary.BigEndian.Uint16(buf) {
t.Errorf("in attempt %d, %d were told as winner, but %d actually stored", i, winner, binary.BigEndian.Uint16(buf))
}
}
})
}

func TestServerClientObjectWriterBucketNotFound(t *testing.T) {
runServersTest(t, runServersOptions{}, func(t *testing.T, server *Server) {
client := server.Client()
Expand Down
6 changes: 3 additions & 3 deletions internal/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func shouldError(t *testing.T, err error) {

func uploadAndCompare(t *testing.T, storage Storage, obj Object) int64 {
isFSStorage := reflect.TypeOf(storage) == reflect.TypeOf(&storageFS{})
newObject, err := storage.CreateObject(obj.StreamingObject())
newObject, err := storage.CreateObject(obj.StreamingObject(), NoConditions{})
if isFSStorage && obj.Generation != 0 {
t.Log("FS should not support objects generation")
shouldError(t, err)
obj.Generation = 0
newObject, err = storage.CreateObject(obj.StreamingObject())
newObject, err = storage.CreateObject(obj.StreamingObject(), NoConditions{})
}
noError(t, err)
newObject.Close()
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestObjectQueryErrors(t *testing.T) {
},
Content: []byte("random-content"),
}
obj, err := storage.CreateObject(validObject.StreamingObject())
obj, err := storage.CreateObject(validObject.StreamingObject(), NoConditions{})
noError(t, err)
obj.Close()
_, err = storage.GetObjectWithGeneration(validObject.BucketName, validObject.Name, 33333)
Expand Down
Loading