Skip to content

Commit

Permalink
Merge pull request #42 from ahouene/prefix-marker-fix
Browse files Browse the repository at this point in the history
Fix global prefix not working with update marker
  • Loading branch information
ahouene authored Jun 29, 2023
2 parents f18c634 + 1b35697 commit a0dd21b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 17 deletions.
2 changes: 1 addition & 1 deletion backends/s3/marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (b *Backend) setMarker(ctx context.Context, name, etag string, isDel bool)
}
nanos := time.Now().UnixNano()
s := fmt.Sprintf("%s:%s:%d:%v", name, etag, nanos, isDel)
_, err := b.doStore(ctx, UpdateMarkerFilename, []byte(s))
_, err := b.doStore(ctx, b.markerName, []byte(s))
if err != nil {
return err
}
Expand Down
37 changes: 24 additions & 13 deletions backends/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,24 @@ func (o Options) Check() error {
}

type Backend struct {
opt Options
config *minio.Options
client *minio.Client
log logr.Logger
opt Options
config *minio.Options
client *minio.Client
log logr.Logger
markerName string

mu sync.Mutex
lastMarker string
lastList simpleblob.BlobList
lastTime time.Time
}

func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
// Prepend global prefix
prefix = b.prependGlobalPrefix(prefix)
func (b *Backend) List(ctx context.Context, prefix string) (blobList simpleblob.BlobList, err error) {
// Handle global prefix
combinedPrefix := b.prependGlobalPrefix(prefix)

if !b.opt.UseUpdateMarker {
return b.doList(ctx, prefix)
return b.doList(ctx, combinedPrefix)
}

m, err := b.Load(ctx, UpdateMarkerFilename)
Expand All @@ -144,7 +145,7 @@ func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList,
return blobs.WithPrefix(prefix), nil
}

blobs, err = b.doList(ctx, "") // We want to cache all, so no prefix
blobs, err = b.doList(ctx, b.opt.GlobalPrefix) // We want to cache all, so no prefix
if err != nil {
return nil, err
}
Expand All @@ -162,7 +163,9 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
var blobs simpleblob.BlobList

// Runes to strip from blob names for GlobalPrefix
gpEndRune := len(b.opt.GlobalPrefix)
// This is fine, because we can trust the API to only return with the prefix.
// TODO: trust but verify
gpEndIndex := len(b.opt.GlobalPrefix)

objCh := b.client.ListObjects(ctx, b.opt.Bucket, minio.ListObjectsOptions{
Prefix: prefix,
Expand All @@ -177,14 +180,14 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis

metricCalls.WithLabelValues("list").Inc()
metricLastCallTimestamp.WithLabelValues("list").SetToCurrentTime()
if obj.Key == UpdateMarkerFilename {
if obj.Key == b.markerName {
continue
}

// Strip global prefix from blob
blobName := obj.Key
if gpEndRune > 0 {
blobName = blobName[gpEndRune:]
if gpEndIndex > 0 {
blobName = blobName[gpEndIndex:]
}

blobs = append(blobs, simpleblob.Blob{Name: blobName, Size: obj.Size})
Expand Down Expand Up @@ -379,10 +382,18 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
client: client,
log: log,
}
b.setGlobalPrefix(opt.GlobalPrefix)

return b, nil
}

// setGlobalPrefix updates the global prefix in b and the cached marker name,
// so it can be dynamically changed in tests.
func (b *Backend) setGlobalPrefix(prefix string) {
b.opt.GlobalPrefix = prefix
b.markerName = b.prependGlobalPrefix(UpdateMarkerFilename)
}

// convertMinioError takes an error, possibly a minio.ErrorResponse
// and turns it into a well known error when possible.
// If error is not well known, it is returned as is.
Expand Down
19 changes: 16 additions & 3 deletions backends/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) {
}
}
// This one is not returned by the List command
err = b.client.RemoveObject(ctx, b.opt.Bucket, UpdateMarkerFilename, minio.RemoveObjectOptions{})
err = b.client.RemoveObject(ctx, b.opt.Bucket, b.markerName, minio.RemoveObjectOptions{})
require.NoError(t, err)
}
t.Cleanup(cleanup)
Expand Down Expand Up @@ -107,10 +107,23 @@ func TestBackend_globalprefix(t *testing.T) {
defer cancel()

b := getBackend(ctx, t)
b.opt.GlobalPrefix = "v5/"
b.setGlobalPrefix("v5/")

tester.DoBackendTests(t, b)
assert.Len(t, b.lastMarker, 0)
assert.Empty(t, b.lastMarker)
}

func TestBackend_globalPrefixAndMarker(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Start the backend over
b := getBackend(ctx, t)
b.setGlobalPrefix("v6/")
b.opt.UseUpdateMarker = true

tester.DoBackendTests(t, b)
assert.NotEmpty(t, b.lastMarker)
}

func TestBackend_recursive(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/PowerDNS/simpleblob"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// DoBackendTests tests a backend for conformance
Expand Down Expand Up @@ -41,6 +42,7 @@ func DoBackendTests(t *testing.T, b simpleblob.Interface) {
ls, err = b.List(ctx, "foo-")
assert.NoError(t, err)
assert.Equal(t, ls.Names(), []string{"foo-1"})
require.NotEmpty(t, ls)
assert.Equal(t, ls[0].Size, int64(3))
ls, err = b.List(ctx, "bar-")
assert.NoError(t, err)
Expand Down

0 comments on commit a0dd21b

Please sign in to comment.