Skip to content

S3: Add global_prefix and prefix_folders options #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions backends/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type Options struct {
// CreateBucket tells us to try to create the bucket
CreateBucket bool `yaml:"create_bucket"`

// GlobalPrefix is a prefix applied to all operations, allowing work within a prefix
// seamlessly
GlobalPrefix string `yaml:"global_prefix"`

// Recursive can be enabled to make List operations recurse into nested prefixes
Recursive bool `yaml:"recursive"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to make listing all filenames with any slashes the default, instead of requiring a per-backend opt-in.

So instead of Recursive we would provide an option to opt-out of the recursion,

Copy link
Contributor Author

@nvaatstra nvaatstra Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below commit removes Recursive and replaces it with PrefixFolders.

PrefixFolders can be enabled to make List operations show nested prefixes as folders instead of recursively listing all contents of nested prefixes.

Tests updated accordingly.

Examples - PrefixFolders
If a bucket contains the following:

- bar1
- bar2
- foo/bar3
- foo/bar4

Then a List() using PrefixFolders unset or false will yield:

- bar1
- bar2
- foo/bar3
- foo/bar4

If PrefixFolders is set to true, then a List() will yield:

- bar1
- bar2
- foo/


// EndpointURL can be set to something like "http://localhost:9000" when using Minio
// or "https://s3.amazonaws.com" for AWS S3.
EndpointURL string `yaml:"endpoint_url"`
Expand Down Expand Up @@ -110,6 +117,9 @@ type Backend struct {
}

func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
// Prepend global prefix
prefix = b.prependGlobalPrefix(prefix)

if !b.opt.UseUpdateMarker {
return b.doList(ctx, prefix)
}
Expand Down Expand Up @@ -150,9 +160,12 @@ func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList,
func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
var blobs simpleblob.BlobList

// Runes to strip from blob names for GlobalPrefix
gpEndRune := len(b.opt.GlobalPrefix)

objCh := b.client.ListObjects(ctx, b.opt.Bucket, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: false,
Recursive: b.opt.Recursive,
})
for obj := range objCh {
// Handle error returned by MinIO client
Expand All @@ -166,7 +179,14 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
if obj.Key == UpdateMarkerFilename {
continue
}
blobs = append(blobs, simpleblob.Blob{Name: obj.Key, Size: obj.Size})

// Strip global prefix from blob
blobName := obj.Key
if gpEndRune > 0 {
blobName = blobName[gpEndRune:]
}

blobs = append(blobs, simpleblob.Blob{Name: blobName, Size: obj.Size})
}

// Minio appears to return them sorted, but maybe not all implementations
Expand All @@ -179,6 +199,9 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
// Load retrieves the content of the object identified by name from S3 Bucket
// configured in b.
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

metricCalls.WithLabelValues("load").Inc()
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()

Expand All @@ -199,6 +222,9 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
// Store sets the content of the object identified by name to the content
// of data, in the S3 Bucket configured in b.
func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

info, err := b.doStore(ctx, name, data)
if err != nil {
return err
Expand All @@ -222,6 +248,9 @@ func (b *Backend) doStore(ctx context.Context, name string, data []byte) (minio.
// Delete removes the object identified by name from the S3 Bucket
// configured in b.
func (b *Backend) Delete(ctx context.Context, name string) error {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

if err := b.doDelete(ctx, name); err != nil {
return err
}
Expand Down Expand Up @@ -370,6 +399,12 @@ func convertMinioError(err error) error {
return nil
}

// prependGlobalPrefix prepends the GlobalPrefix to the name/prefix
// passed as input
func (b *Backend) prependGlobalPrefix(name string) string {
return b.opt.GlobalPrefix + name
}

func init() {
simpleblob.RegisterBackend("s3", func(ctx context.Context, p simpleblob.InitParams) (simpleblob.Interface, error) {
var opt Options
Expand Down
56 changes: 54 additions & 2 deletions backends/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

blobs, err := b.doList(ctx, "")
blobs, err := b.List(ctx, "")
if err != nil {
t.Logf("Blobs list error: %s", err)
return
}
for _, blob := range blobs {
err := b.client.RemoveObject(ctx, b.opt.Bucket, blob.Name, minio.RemoveObjectOptions{})
err := b.Delete(ctx, blob.Name)
if err != nil {
t.Logf("Object delete error: %s", err)
}
Expand Down Expand Up @@ -101,3 +101,55 @@ func TestBackend_marker(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, b.lastMarker, markerFileContent)
}

func TestBackend_globalprefix(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

b := getBackend(ctx, t)
b.opt.GlobalPrefix = "v5/"

tester.DoBackendTests(t, b)
assert.Len(t, b.lastMarker, 0)
}

func TestBackend_recursive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

b := getBackend(ctx, t)

// Starts empty
ls, err := b.List(ctx, "")
assert.NoError(t, err)
assert.Len(t, ls, 0)

// Add items
err = b.Store(ctx, "bar-1", []byte("bar1"))
assert.NoError(t, err)
err = b.Store(ctx, "bar-2", []byte("bar2"))
assert.NoError(t, err)
err = b.Store(ctx, "foo/bar-3", []byte("bar3"))
assert.NoError(t, err)

// List all - no recursion (default)
ls, err = b.List(ctx, "")
assert.NoError(t, err)
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/"})

// List all - recursive enabled
b.opt.Recursive = true

ls, err = b.List(ctx, "")
assert.NoError(t, err)
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/bar-3"})

// List all - recursive disabled
b.opt.Recursive = false

ls, err = b.List(ctx, "")
assert.NoError(t, err)
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/"})

assert.Len(t, b.lastMarker, 0)
}